diff --git a/src/sql/azdata.d.ts b/src/sql/azdata.d.ts index ceb2885719..ffeac899c4 100644 --- a/src/sql/azdata.d.ts +++ b/src/sql/azdata.d.ts @@ -4904,8 +4904,6 @@ declare module 'azdata' { * * @param disposeOnDone - Whether to dispose of the future when done. * - * @param cellId - Cell id (used by queryRunner) - * * @returns A kernel future. * * #### Notes @@ -4920,7 +4918,7 @@ declare module 'azdata' { * * **See also:** [[IExecuteReply]] */ - requestExecute(content: IExecuteRequest, disposeOnDone?: boolean, cellUri?: string): IFuture; + requestExecute(content: IExecuteRequest, disposeOnDone?: boolean): IFuture; /** * Send a `complete_request` message. diff --git a/src/sql/workbench/contrib/notebook/test/electron-browser/cell.test.ts b/src/sql/workbench/contrib/notebook/test/electron-browser/cell.test.ts index 8b71471f5d..6c3a9332ee 100644 --- a/src/sql/workbench/contrib/notebook/test/electron-browser/cell.test.ts +++ b/src/sql/workbench/contrib/notebook/test/electron-browser/cell.test.ts @@ -778,7 +778,7 @@ suite('Cell Model', function (): void { test('Execute returns error status', async function (): Promise { mockKernel.setup(k => k.requiresConnection).returns(() => false); - mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => { + mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => { let replyMsg: nb.IExecuteReplyMsg = { content: { execution_count: 1, @@ -796,7 +796,7 @@ suite('Cell Model', function (): void { test('Execute returns abort status', async function (): Promise { mockKernel.setup(k => k.requiresConnection).returns(() => false); - mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => { + mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => { let replyMsg: nb.IExecuteReplyMsg = { content: { execution_count: 1, @@ -815,7 +815,7 @@ suite('Cell Model', function (): void { test('Execute throws exception', async function (): Promise { let testMsg = 'Test message'; mockKernel.setup(k => k.requiresConnection).returns(() => false); - mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => { + mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => { throw new Error(testMsg); }); diff --git a/src/sql/workbench/services/notebook/browser/models/cell.ts b/src/sql/workbench/services/notebook/browser/models/cell.ts index 3c891e9834..708bb8f63b 100644 --- a/src/sql/workbench/services/notebook/browser/models/cell.ts +++ b/src/sql/workbench/services/notebook/browser/models/cell.ts @@ -408,7 +408,7 @@ export class CellModel extends Disposable implements ICellModel { const future = kernel.requestExecute({ code: content, stop_on_error: true - }, false, this._cellUri.toString()); + }, false); this.setFuture(future as FutureInternal); this.fireExecutionStateChanged(); // For now, await future completion. Later we should just track and handle cancellation based on model notifications diff --git a/src/sql/workbench/services/notebook/browser/sql/sqlSessionManager.ts b/src/sql/workbench/services/notebook/browser/sql/sqlSessionManager.ts index 47496b8cb5..aaf9a86414 100644 --- a/src/sql/workbench/services/notebook/browser/sql/sqlSessionManager.ts +++ b/src/sql/workbench/services/notebook/browser/sql/sqlSessionManager.ts @@ -23,12 +23,12 @@ import { isUndefinedOrNull } from 'vs/base/common/types'; import { ILanguageMagic } from 'sql/workbench/services/notebook/browser/notebookService'; import { ITextResourcePropertiesService } from 'vs/editor/common/services/textResourceConfigurationService'; import { URI } from 'vs/base/common/uri'; +import { getUriPrefix, uriPrefixes } from 'sql/platform/connection/common/utils'; import { firstIndex } from 'vs/base/common/arrays'; import { startsWith } from 'vs/base/common/strings'; import { onUnexpectedError } from 'vs/base/common/errors'; import { FutureInternal, notebookConstants } from 'sql/workbench/services/notebook/browser/interfaces'; import { tryMatchCellMagic } from 'sql/workbench/services/notebook/browser/utils'; -import { IQueryManagementService } from 'sql/workbench/services/query/common/queryManagement'; export const sqlKernelError: string = localize("sqlKernelError", "SQL kernel error"); export const MAX_ROWS = 5000; @@ -158,7 +158,7 @@ export class SqlSession implements nb.ISession { } class SqlKernel extends Disposable implements nb.IKernel { - private _queryRunners: Map = new Map(); + private _queryRunner: QueryRunner; private _currentConnection: IConnectionProfile; private _currentConnectionProfile: ConnectionProfile; static kernelId: number = 0; @@ -167,6 +167,7 @@ class SqlKernel extends Disposable implements nb.IKernel { private _future: SQLFuture; private _executionCount: number = 0; private _magicToExecutorMap = new Map(); + private _connectionPath: string; constructor(private _path: string, @IConnectionManagementService private _connectionManagementService: IConnectionManagementService, @@ -175,11 +176,11 @@ class SqlKernel extends Disposable implements nb.IKernel { @IErrorMessageService private _errorMessageService: IErrorMessageService, @IConfigurationService private _configurationService: IConfigurationService, @ILogService private readonly logService: ILogService, - @ITextResourcePropertiesService private readonly textResourcePropertiesService: ITextResourcePropertiesService, - @IQueryManagementService private queryManagementService: IQueryManagementService + @ITextResourcePropertiesService private readonly textResourcePropertiesService: ITextResourcePropertiesService ) { super(); this.initMagics(); + this.setConnectionPath(); } private initMagics(): void { @@ -189,6 +190,29 @@ class SqlKernel extends Disposable implements nb.IKernel { } } + private setConnectionPath(): void { + if (this._path) { + let prefix = getUriPrefix(this._path); + if (!prefix || prefix === uriPrefixes.connection) { + this._connectionPath = uriPrefixes.notebook.concat(this._path); + } else if (prefix !== uriPrefixes.notebook) { + try { + let uri = URI.parse(this._path); + if (uri && uri.scheme) { + this._connectionPath = uri.toString().replace(uri.scheme, uriPrefixes.notebook); + } + } catch { + // Ignore exceptions from URI parsing + } finally { + // If _connectionPath hasn't been set yet, set _connectionPath to _path as a last resort + if (!this._connectionPath) { + this._connectionPath = this._path; + } + } + } + } + } + public get id(): string { if (this._id === undefined) { this._id = (SqlKernel.kernelId++).toString(); @@ -239,33 +263,29 @@ class SqlKernel extends Disposable implements nb.IKernel { public set connection(conn: IConnectionProfile) { this._currentConnection = conn; this._currentConnectionProfile = new ConnectionProfile(this._capabilitiesService, this._currentConnection); - this._queryRunners.clear(); + this._queryRunner = undefined; } getSpec(): Thenable { return Promise.resolve(notebookConstants.sqlKernelSpec); } - requestExecute(content: nb.IExecuteRequest, disposeOnDone?: boolean, cellUri?: string): nb.IFuture { + requestExecute(content: nb.IExecuteRequest, disposeOnDone?: boolean): nb.IFuture { let canRun: boolean = true; let code = this.getCodeWithoutCellMagic(content); - let queryRunnerUri = 'queryRunner-' + cellUri; - let queryRunner: QueryRunner | undefined = this._queryRunners.get(queryRunnerUri); - if (queryRunner) { + if (this._queryRunner) { // Cancel any existing query - if (this._future && !queryRunner.hasCompleted) { - queryRunner.cancelQuery().then(ok => undefined, error => this._errorMessageService.showDialog(Severity.Error, sqlKernelError, error)); + if (this._future && !this._queryRunner.hasCompleted) { + this._queryRunner.cancelQuery().then(ok => undefined, error => this._errorMessageService.showDialog(Severity.Error, sqlKernelError, error)); // TODO when we can just show error as an output, should show an "execution canceled" error in output this._future.handleDone().catch(err => onUnexpectedError(err)); } - queryRunner.runQuery(code).catch(err => onUnexpectedError(err)); + this._queryRunner.runQuery(code).catch(err => onUnexpectedError(err)); } else if (this._currentConnection && this._currentConnectionProfile) { - queryRunner = this._instantiationService.createInstance(QueryRunner, queryRunnerUri); - this._queryRunners.set(queryRunnerUri, queryRunner); - this.queryManagementService.registerRunner(queryRunner, queryRunnerUri); - this._connectionManagementService.connect(this._currentConnectionProfile, queryRunnerUri).then((result) => { - this.addQueryEventListeners(queryRunner); - queryRunner.runQuery(code).catch(err => onUnexpectedError(err)); + this._queryRunner = this._instantiationService.createInstance(QueryRunner, this._connectionPath); + this._connectionManagementService.connect(this._currentConnectionProfile, this._connectionPath).then((result) => { + this.addQueryEventListeners(this._queryRunner); + this._queryRunner.runQuery(code).catch(err => onUnexpectedError(err)); }).catch(err => onUnexpectedError(err)); } else { canRun = false; @@ -275,7 +295,7 @@ class SqlKernel extends Disposable implements nb.IKernel { // TODO verify this is "canonical" behavior let count = canRun ? ++this._executionCount : undefined; - this._future = new SQLFuture(queryRunner, count, this._configurationService, this.logService); + this._future = new SQLFuture(this._queryRunner, count, this._configurationService, this.logService); if (!canRun) { // Complete early this._future.handleDone(new Error(localize('connectionRequired', "A connection must be chosen to run notebook cells"))).catch(err => onUnexpectedError(err)); @@ -311,8 +331,8 @@ class SqlKernel extends Disposable implements nb.IKernel { interrupt(): Thenable { // TODO: figure out what to do with the QueryCancelResult - let runners = [...this._queryRunners.values()]; - return Promise.all(runners.map(queryRunner => queryRunner.cancelQuery())).then(); + return this._queryRunner.cancelQuery().then((cancelResult) => { + }); } private addQueryEventListeners(queryRunner: QueryRunner): void { @@ -329,14 +349,9 @@ class SqlKernel extends Disposable implements nb.IKernel { } } })); - this._register(queryRunner.onResultSet(resultSet => { - if (this._future) { - this._future.onResultSet(resultSet); - } - })); this._register(queryRunner.onBatchEnd(batch => { if (this._future) { - this._future.onBatchEnd(batch); + this._future.handleBatchEnd(batch); } })); } @@ -349,16 +364,15 @@ class SqlKernel extends Disposable implements nb.IKernel { } public async disconnect(): Promise { - this._queryRunners.forEach(async (queryRunner: QueryRunner, uri: string) => { - if (this._connectionManagementService.isConnected(uri)) { + if (this._connectionPath) { + if (this._connectionManagementService.isConnected(this._connectionPath)) { try { - await this._connectionManagementService.disconnect(uri); + await this._connectionManagementService.disconnect(this._connectionPath); } catch (err) { this.logService.error(err); } - } - }); + } return; } } @@ -370,9 +384,9 @@ export class SQLFuture extends Disposable implements FutureInternal { private doneDeferred = new Deferred(); private configuredMaxRows: number = MAX_ROWS; private _outputAddedPromises: Promise[] = []; + private _querySubsetResultMap: Map = new Map(); private _errorOccurred: boolean = false; private _stopOnError: boolean = true; - constructor( private _queryRunner: QueryRunner, private _executionCount: number | undefined, @@ -428,6 +442,7 @@ export class SQLFuture extends Disposable implements FutureInternal { this.doneHandler.handle(msg); } this.doneDeferred.resolve(msg); + this._querySubsetResultMap.clear(); } sendInputReply(content: nb.IInputReply): void { @@ -458,32 +473,28 @@ export class SQLFuture extends Disposable implements FutureInternal { } } - public onResultSet(resultSet: ResultSetSummary | ResultSetSummary[]): void { + public handleBatchEnd(batch: BatchSummary): void { if (this.ioHandler) { - this._outputAddedPromises.push(this.sendInitialResultSets(resultSet)); + this._outputAddedPromises.push(this.processResultSets(batch)); } } - public onBatchEnd(batch: BatchSummary): void { - if (this.ioHandler) { - for (let set of batch.resultSetSummaries) { - if (set.rowCount > this.configuredMaxRows) { - this.handleMessage(localize('sqlMaxRowsDisplayed', "Displaying Top {0} rows.", this.configuredMaxRows)); - } - } - } - } - - private async sendInitialResultSets(resultSet: ResultSetSummary | ResultSetSummary[]): Promise { + private async processResultSets(batch: BatchSummary): Promise { try { - let resultsToAdd: ResultSetSummary[]; - if (!Array.isArray(resultSet)) { - resultsToAdd = [resultSet]; - } else { - resultsToAdd = resultSet?.splice(0); + let queryRowsPromises: Promise[] = []; + for (let resultSet of batch.resultSetSummaries) { + let rowCount = resultSet.rowCount > this.configuredMaxRows ? this.configuredMaxRows : resultSet.rowCount; + if (rowCount === this.configuredMaxRows) { + this.handleMessage(localize('sqlMaxRowsDisplayed', "Displaying Top {0} rows.", rowCount)); + } + queryRowsPromises.push(this.getAllQueryRows(rowCount, resultSet)); } - for (let set of resultsToAdd) { - this.sendIOPubMessage(set, false); + // We want to display table in the same order + let i = 0; + for (let resultSet of batch.resultSetSummaries) { + await queryRowsPromises[i]; + this.sendResultSetAsIOPub(resultSet); + i++; } } catch (err) { // TODO should we output this somewhere else? @@ -491,7 +502,31 @@ export class SQLFuture extends Disposable implements FutureInternal { } } - private sendIOPubMessage(resultSet: ResultSetSummary, conversionComplete?: boolean, subsetResult?: ResultSetSubset): void { + private async getAllQueryRows(rowCount: number, resultSet: ResultSetSummary): Promise { + let deferred: Deferred = new Deferred(); + if (rowCount > 0) { + this._queryRunner.getQueryRows(0, rowCount, resultSet.batchId, resultSet.id).then((result) => { + this._querySubsetResultMap.set(resultSet.id, result); + deferred.resolve(); + }, (err) => { + this._querySubsetResultMap.set(resultSet.id, { rowCount: 0, rows: [] }); + deferred.reject(err); + }); + } else { + this._querySubsetResultMap.set(resultSet.id, { rowCount: 0, rows: [] }); + deferred.resolve(); + } + return deferred; + } + + private sendResultSetAsIOPub(resultSet: ResultSetSummary): void { + if (this._querySubsetResultMap && this._querySubsetResultMap.get(resultSet.id)) { + let subsetResult = this._querySubsetResultMap.get(resultSet.id); + this.sendIOPubMessage(subsetResult, resultSet); + } + } + + private sendIOPubMessage(subsetResult: ResultSetSubset, resultSet: ResultSetSummary): void { let msg: nb.IIOPubMessage = { channel: 'iopub', type: 'iopub', @@ -503,21 +538,16 @@ export class SQLFuture extends Disposable implements FutureInternal { output_type: 'execute_result', metadata: {}, execution_count: this._executionCount, - // Initial data sent to notebook only contains column headers since - // onResultSet only returns the column info (and no row data). - // Row data conversion will be handled in DataResourceDataProvider data: { - 'application/vnd.dataresource+json': this.convertToDataResource(resultSet.columnInfo), - 'text/html': this.convertToHtmlTable(resultSet.columnInfo) - }, - batchId: resultSet.batchId, - id: resultSet.id, - queryRunnerUri: this._queryRunner.uri, + 'application/vnd.dataresource+json': this.convertToDataResource(resultSet.columnInfo, subsetResult), + 'text/html': this.convertToHtmlTable(resultSet.columnInfo, subsetResult) + } }, metadata: undefined, parent_header: undefined }; this.ioHandler.handle(msg); + this._querySubsetResultMap.delete(resultSet.id); } setIOPubHandler(handler: nb.MessageHandler): void { @@ -531,31 +561,49 @@ export class SQLFuture extends Disposable implements FutureInternal { // no-op } - private convertToDataResource(columns: IColumn[]): IDataResource { + private convertToDataResource(columns: IColumn[], subsetResult: ResultSetSubset): IDataResource { let columnsResources: IDataResourceSchema[] = []; columns.forEach(column => { columnsResources.push({ name: escape(column.columnName) }); }); - let columnsFields: IDataResourceFields = { fields: columnsResources }; + let columnsFields: IDataResourceFields = { fields: undefined }; + columnsFields.fields = columnsResources; return { schema: columnsFields, - data: [] + data: subsetResult.rows.map(row => { + let rowObject: { [key: string]: any; } = {}; + row.forEach((val, index) => { + rowObject[index] = val.displayValue; + }); + return rowObject; + }) }; } - private convertToHtmlTable(columns: IColumn[]): string[] { - let htmlTable: string[] = new Array(3); - htmlTable[0] = ''; + private convertToHtmlTable(columns: IColumn[], d: ResultSetSubset): string[] { + // Adding 3 for
, column title rows,
+ let htmlStringArr: string[] = new Array(d.rowCount + 3); + htmlStringArr[0] = ''; if (columns.length > 0) { let columnHeaders = ''; for (let column of columns) { columnHeaders += ``; } columnHeaders += ''; - htmlTable[1] = columnHeaders; + htmlStringArr[1] = columnHeaders; } - htmlTable[2] = '
${escape(column.columnName)}
'; - return htmlTable; + let i = 2; + for (const row of d.rows) { + let rowData = ''; + for (let columnIndex = 0; columnIndex < columns.length; columnIndex++) { + rowData += `${escape(row[columnIndex].displayValue)}`; + } + rowData += ''; + htmlStringArr[i] = rowData; + i++; + } + htmlStringArr[htmlStringArr.length - 1] = ''; + return htmlStringArr; } private convertToDisplayMessage(msg: IResultMessage | string): nb.IIOPubMessage {