Fix bug where results were added to all cells, and support multiple resultsets (#3890)

- SQLKernel is the only place to listen for batch and query complete messages now
- It routes to the 1 and only future (since can only have 1 at a time
- It handles query cancelation and not-connected issues correctly
This commit is contained in:
Kevin Cunnane
2019-02-01 13:53:10 -08:00
committed by Raj
parent 5132e62045
commit 374212beaa

View File

@@ -13,11 +13,12 @@ import { IInstantiationService } from 'vs/platform/instantiation/common/instanti
import Severity from 'vs/base/common/severity';
import * as Utils from 'sql/platform/connection/common/utils';
import { Deferred } from 'sql/base/common/promise';
import { Disposable } from 'vs/base/common/lifecycle';
import { Disposable, IDisposable } from 'vs/base/common/lifecycle';
import { IErrorMessageService } from 'sql/platform/errorMessage/common/errorMessageService';
import { ConnectionProfile } from 'sql/platform/connection/common/connectionProfile';
import { IConnectionProfile } from 'sql/platform/connection/common/interfaces';
import { escape } from 'sql/base/common/strings';
import { BatchSummary } from 'sqlops';
export const sqlKernel: string = localize('sqlKernel', 'SQL');
export const sqlKernelError: string = localize("sqlKernelError", "SQL kernel error");
@@ -126,11 +127,12 @@ export class SqlSession implements nb.ISession {
class SqlKernel extends Disposable implements nb.IKernel {
private _queryRunner: QueryRunner;
private _columns: IDbColumn[];
private _rows: DbCellValue[][];
private _currentConnection: IConnectionProfile;
static kernelId: number = 0;
private _id: string;
private _future: SQLFuture;
constructor( @IConnectionManagementService private _connectionManagementService: IConnectionManagementService,
@IInstantiationService private _instantiationService: IInstantiationService,
@IErrorMessageService private _errorMessageService: IErrorMessageService) {
@@ -138,7 +140,10 @@ class SqlKernel extends Disposable implements nb.IKernel {
}
public get id(): string {
return (SqlKernel.kernelId++).toString();
if (this._id === undefined) {
this._id = (SqlKernel.kernelId++).toString();
}
return this._id;
}
public get name(): string {
@@ -187,7 +192,14 @@ class SqlKernel extends Disposable implements nb.IKernel {
}
requestExecute(content: nb.IExecuteRequest, disposeOnDone?: boolean): nb.IFuture {
let canRun: boolean = true;
if (this._queryRunner) {
// Cancel any existing query
if (this._future && !this._queryRunner.hasCompleted) {
this._queryRunner.cancelQuery().then(ok => undefined, error => this._errorMessageService.showDialog(Severity.Error, sqlKernelError, error));
// TODO when we can just show error as an output, should show an "execution canceled" error in output
this._future.handleDone();
}
this._queryRunner.runQuery(content.code);
} else if (this._currentConnection) {
let connectionUri = Utils.generateUri(this._currentConnection, 'notebook');
@@ -197,9 +209,18 @@ class SqlKernel extends Disposable implements nb.IKernel {
this.addQueryEventListeners(this._queryRunner);
this._queryRunner.runQuery(content.code);
});
} else {
canRun = false;
}
return new SQLFuture(this._queryRunner);
this._future = new SQLFuture(this._queryRunner);
if (!canRun) {
// Complete early
this._future.handleDone(new Error(localize('connectionRequired', 'A connection must be chosen to run notebook cells')));
}
// TODO should we cleanup old future? I don't think we need to
return this._future;
}
requestComplete(content: nb.ICompleteRequest): Thenable<nb.ICompleteReplyMsg> {
@@ -220,41 +241,54 @@ class SqlKernel extends Disposable implements nb.IKernel {
});
}));
this._register(queryRunner.addListener(EventType.MESSAGE, message => {
// TODO handle showing a messages output (should be updated with all messages, only changing 1 output in total)
if (message.isError) {
this._errorMessageService.showDialog(Severity.Error, sqlKernelError, message.message);
}
}));
this._register(queryRunner.addListener(EventType.BATCH_COMPLETE, batch => {
if (this._future) {
this._future.handleBatchEnd(batch);
}
}));
}
private async queryComplete(): Promise<void> {
let batches = this._queryRunner.batchSets;
// currently only support 1 batch set 1 resultset
if (batches.length > 0) {
let batch = batches[0];
if (batch.resultSetSummaries.length > 0
&& batch.resultSetSummaries[0].rowCount > 0
) {
let resultset = batch.resultSetSummaries[0];
this._columns = resultset.columnInfo;
let rows: QueryExecuteSubsetResult;
try {
rows = await this._queryRunner.getQueryRows(0, resultset.rowCount, batch.id, resultset.id);
} catch (e) {
return Promise.reject(e);
}
this._rows = rows.resultSubset.rows;
}
if (this._future) {
this._future.handleDone();
}
// let batches = this._queryRunner.batchSets;
// // currently only support 1 batch set 1 resultset
// if (batches.length > 0) {
// let batch = batches[0];
// if (batch.resultSetSummaries.length > 0
// && batch.resultSetSummaries[0].rowCount > 0
// ) {
// let resultset = batch.resultSetSummaries[0];
// this._columns = resultset.columnInfo;
// let rows: QueryExecuteSubsetResult;
// try {
// rows = await this._queryRunner.getQueryRows(0, resultset.rowCount, batch.id, resultset.id);
// } catch (e) {
// return Promise.reject(e);
// }
// this._rows = rows.resultSubset.rows;
// }
// }
// TODO issue #2746 should ideally show a warning inside the dialog if have no data
}
}
export class SQLFuture extends Disposable implements FutureInternal {
private _msg: nb.IMessage = undefined;
private ioHandler: nb.MessageHandler<nb.IIOPubMessage>;
private doneHandler: nb.MessageHandler<nb.IShellMessage>;
private doneDeferred = new Deferred<nb.IShellMessage>();
constructor(private _queryRunner: QueryRunner) {
super();
}
get inProgress(): boolean {
return this._queryRunner && !this._queryRunner.hasCompleted;
}
@@ -268,28 +302,24 @@ export class SQLFuture extends Disposable implements FutureInternal {
}
get done(): Thenable<nb.IShellMessage> {
let deferred = new Deferred<nb.IShellMessage>();
try {
if (this._queryRunner) {
this._register(this._queryRunner.onBatchEnd(e => {
let msg: nb.IShellMessage = {
channel: 'shell',
type: 'execute_reply',
content: { status: 'ok' },
header: undefined,
metadata: {},
parent_header: undefined
};
this._msg = msg;
deferred.resolve(msg);
}));
} else {
deferred.resolve();
}
} catch {
return Promise.resolve(undefined);
return this.doneDeferred.promise;
}
public handleDone(err?: Error): void {
let msg: nb.IShellMessage = {
channel: 'shell',
type: 'execute_reply',
content: { status: 'ok' },
header: undefined,
metadata: {},
parent_header: undefined
};
this._msg = msg;
if (this.doneHandler) {
this.doneHandler.handle(msg);
}
return deferred.promise;
this.doneDeferred.resolve(msg);
// TODO we should reject where some failure happened?
}
sendInputReply(content: nb.IInputReply): void {
@@ -302,12 +332,13 @@ export class SQLFuture extends Disposable implements FutureInternal {
setStdInHandler(handler: nb.MessageHandler<nb.IStdinMessage>): void {
// no-op
}
setIOPubHandler(handler: nb.MessageHandler<nb.IIOPubMessage>): void {
if (this._queryRunner) {
this._register(this._queryRunner.onBatchEnd(batch => {
let rowCount = batch.resultSetSummaries[0].rowCount > MAX_ROWS ? MAX_ROWS : batch.resultSetSummaries[0].rowCount;
this._queryRunner.getQueryRows(0, rowCount, 0, 0).then(d => {
let columns = batch.resultSetSummaries[0].columnInfo;
public handleBatchEnd(batch: BatchSummary): void {
if (this.ioHandler) {
for (let resultSet of batch.resultSetSummaries) {
let rowCount = resultSet.rowCount > MAX_ROWS ? MAX_ROWS : resultSet.rowCount;
this._queryRunner.getQueryRows(0, rowCount, resultSet.batchId, resultSet.id).then(d => {
let columns = resultSet.columnInfo;
let msg: nb.IIOPubMessage = {
channel: 'iopub',
@@ -319,17 +350,22 @@ export class SQLFuture extends Disposable implements FutureInternal {
content: <nb.IExecuteResult>{
output_type: 'execute_result',
metadata: {},
execution_count: 0,
execution_count: 1,
data: { 'application/vnd.dataresource+json': this.convertToDataResource(columns, d), 'text/html': this.convertToHtmlTable(columns, d) }
},
metadata: undefined,
parent_header: undefined
};
handler.handle(msg);
this.ioHandler.handle(msg);
});
}));
}
}
}
setIOPubHandler(handler: nb.MessageHandler<nb.IIOPubMessage>): void {
this.ioHandler = handler;
}
registerMessageHook(hook: (msg: nb.IIOPubMessage) => boolean | Thenable<boolean>): void {
// no-op
}