diff --git a/src/sql/workbench/services/notebook/common/sqlSessionManager.ts b/src/sql/workbench/services/notebook/common/sqlSessionManager.ts index 60cadd5b5e..03df8ffeb1 100644 --- a/src/sql/workbench/services/notebook/common/sqlSessionManager.ts +++ b/src/sql/workbench/services/notebook/common/sqlSessionManager.ts @@ -13,11 +13,12 @@ import { IInstantiationService } from 'vs/platform/instantiation/common/instanti import Severity from 'vs/base/common/severity'; import * as Utils from 'sql/platform/connection/common/utils'; import { Deferred } from 'sql/base/common/promise'; -import { Disposable } from 'vs/base/common/lifecycle'; +import { Disposable, IDisposable } from 'vs/base/common/lifecycle'; import { IErrorMessageService } from 'sql/platform/errorMessage/common/errorMessageService'; import { ConnectionProfile } from 'sql/platform/connection/common/connectionProfile'; import { IConnectionProfile } from 'sql/platform/connection/common/interfaces'; import { escape } from 'sql/base/common/strings'; +import { BatchSummary } from 'sqlops'; export const sqlKernel: string = localize('sqlKernel', 'SQL'); export const sqlKernelError: string = localize("sqlKernelError", "SQL kernel error"); @@ -126,11 +127,12 @@ export class SqlSession implements nb.ISession { class SqlKernel extends Disposable implements nb.IKernel { private _queryRunner: QueryRunner; - private _columns: IDbColumn[]; - private _rows: DbCellValue[][]; private _currentConnection: IConnectionProfile; static kernelId: number = 0; + private _id: string; + private _future: SQLFuture; + constructor( @IConnectionManagementService private _connectionManagementService: IConnectionManagementService, @IInstantiationService private _instantiationService: IInstantiationService, @IErrorMessageService private _errorMessageService: IErrorMessageService) { @@ -138,7 +140,10 @@ class SqlKernel extends Disposable implements nb.IKernel { } public get id(): string { - return (SqlKernel.kernelId++).toString(); + if (this._id === undefined) { + this._id = (SqlKernel.kernelId++).toString(); + } + return this._id; } public get name(): string { @@ -187,7 +192,14 @@ class SqlKernel extends Disposable implements nb.IKernel { } requestExecute(content: nb.IExecuteRequest, disposeOnDone?: boolean): nb.IFuture { + let canRun: boolean = true; if (this._queryRunner) { + // Cancel any existing query + if (this._future && !this._queryRunner.hasCompleted) { + this._queryRunner.cancelQuery().then(ok => undefined, error => this._errorMessageService.showDialog(Severity.Error, sqlKernelError, error)); + // TODO when we can just show error as an output, should show an "execution canceled" error in output + this._future.handleDone(); + } this._queryRunner.runQuery(content.code); } else if (this._currentConnection) { let connectionUri = Utils.generateUri(this._currentConnection, 'notebook'); @@ -197,9 +209,18 @@ class SqlKernel extends Disposable implements nb.IKernel { this.addQueryEventListeners(this._queryRunner); this._queryRunner.runQuery(content.code); }); + } else { + canRun = false; } - return new SQLFuture(this._queryRunner); + this._future = new SQLFuture(this._queryRunner); + if (!canRun) { + // Complete early + this._future.handleDone(new Error(localize('connectionRequired', 'A connection must be chosen to run notebook cells'))); + } + + // TODO should we cleanup old future? I don't think we need to + return this._future; } requestComplete(content: nb.ICompleteRequest): Thenable { @@ -220,41 +241,54 @@ class SqlKernel extends Disposable implements nb.IKernel { }); })); this._register(queryRunner.addListener(EventType.MESSAGE, message => { + // TODO handle showing a messages output (should be updated with all messages, only changing 1 output in total) if (message.isError) { this._errorMessageService.showDialog(Severity.Error, sqlKernelError, message.message); } })); + this._register(queryRunner.addListener(EventType.BATCH_COMPLETE, batch => { + if (this._future) { + this._future.handleBatchEnd(batch); + } + })); } private async queryComplete(): Promise { - let batches = this._queryRunner.batchSets; - // currently only support 1 batch set 1 resultset - if (batches.length > 0) { - let batch = batches[0]; - if (batch.resultSetSummaries.length > 0 - && batch.resultSetSummaries[0].rowCount > 0 - ) { - let resultset = batch.resultSetSummaries[0]; - this._columns = resultset.columnInfo; - let rows: QueryExecuteSubsetResult; - try { - rows = await this._queryRunner.getQueryRows(0, resultset.rowCount, batch.id, resultset.id); - } catch (e) { - return Promise.reject(e); - } - this._rows = rows.resultSubset.rows; - } + if (this._future) { + this._future.handleDone(); } + // let batches = this._queryRunner.batchSets; + // // currently only support 1 batch set 1 resultset + // if (batches.length > 0) { + // let batch = batches[0]; + // if (batch.resultSetSummaries.length > 0 + // && batch.resultSetSummaries[0].rowCount > 0 + // ) { + // let resultset = batch.resultSetSummaries[0]; + // this._columns = resultset.columnInfo; + // let rows: QueryExecuteSubsetResult; + // try { + // rows = await this._queryRunner.getQueryRows(0, resultset.rowCount, batch.id, resultset.id); + // } catch (e) { + // return Promise.reject(e); + // } + // this._rows = rows.resultSubset.rows; + // } + // } // TODO issue #2746 should ideally show a warning inside the dialog if have no data } } export class SQLFuture extends Disposable implements FutureInternal { private _msg: nb.IMessage = undefined; + private ioHandler: nb.MessageHandler; + private doneHandler: nb.MessageHandler; + private doneDeferred = new Deferred(); constructor(private _queryRunner: QueryRunner) { super(); } + get inProgress(): boolean { return this._queryRunner && !this._queryRunner.hasCompleted; } @@ -268,28 +302,24 @@ export class SQLFuture extends Disposable implements FutureInternal { } get done(): Thenable { - let deferred = new Deferred(); - try { - if (this._queryRunner) { - this._register(this._queryRunner.onBatchEnd(e => { - let msg: nb.IShellMessage = { - channel: 'shell', - type: 'execute_reply', - content: { status: 'ok' }, - header: undefined, - metadata: {}, - parent_header: undefined - }; - this._msg = msg; - deferred.resolve(msg); - })); - } else { - deferred.resolve(); - } - } catch { - return Promise.resolve(undefined); + return this.doneDeferred.promise; + } + + public handleDone(err?: Error): void { + let msg: nb.IShellMessage = { + channel: 'shell', + type: 'execute_reply', + content: { status: 'ok' }, + header: undefined, + metadata: {}, + parent_header: undefined + }; + this._msg = msg; + if (this.doneHandler) { + this.doneHandler.handle(msg); } - return deferred.promise; + this.doneDeferred.resolve(msg); + // TODO we should reject where some failure happened? } sendInputReply(content: nb.IInputReply): void { @@ -302,12 +332,13 @@ export class SQLFuture extends Disposable implements FutureInternal { setStdInHandler(handler: nb.MessageHandler): void { // no-op } - setIOPubHandler(handler: nb.MessageHandler): void { - if (this._queryRunner) { - this._register(this._queryRunner.onBatchEnd(batch => { - let rowCount = batch.resultSetSummaries[0].rowCount > MAX_ROWS ? MAX_ROWS : batch.resultSetSummaries[0].rowCount; - this._queryRunner.getQueryRows(0, rowCount, 0, 0).then(d => { - let columns = batch.resultSetSummaries[0].columnInfo; + + public handleBatchEnd(batch: BatchSummary): void { + if (this.ioHandler) { + for (let resultSet of batch.resultSetSummaries) { + let rowCount = resultSet.rowCount > MAX_ROWS ? MAX_ROWS : resultSet.rowCount; + this._queryRunner.getQueryRows(0, rowCount, resultSet.batchId, resultSet.id).then(d => { + let columns = resultSet.columnInfo; let msg: nb.IIOPubMessage = { channel: 'iopub', @@ -319,17 +350,22 @@ export class SQLFuture extends Disposable implements FutureInternal { content: { output_type: 'execute_result', metadata: {}, - execution_count: 0, + execution_count: 1, data: { 'application/vnd.dataresource+json': this.convertToDataResource(columns, d), 'text/html': this.convertToHtmlTable(columns, d) } }, metadata: undefined, parent_header: undefined }; - handler.handle(msg); + this.ioHandler.handle(msg); }); - })); + } } } + + setIOPubHandler(handler: nb.MessageHandler): void { + this.ioHandler = handler; + } + registerMessageHook(hook: (msg: nb.IIOPubMessage) => boolean | Thenable): void { // no-op }