revert grid streaming changes (#12650)

This commit is contained in:
Lucy Zhang
2020-09-28 18:34:58 -07:00
committed by GitHub
parent 44416abe6e
commit cf9754f627
4 changed files with 125 additions and 79 deletions

4
src/sql/azdata.d.ts vendored
View File

@@ -4904,8 +4904,6 @@ declare module 'azdata' {
* *
* @param disposeOnDone - Whether to dispose of the future when done. * @param disposeOnDone - Whether to dispose of the future when done.
* *
* @param cellId - Cell id (used by queryRunner)
*
* @returns A kernel future. * @returns A kernel future.
* *
* #### Notes * #### Notes
@@ -4920,7 +4918,7 @@ declare module 'azdata' {
* *
* **See also:** [[IExecuteReply]] * **See also:** [[IExecuteReply]]
*/ */
requestExecute(content: IExecuteRequest, disposeOnDone?: boolean, cellUri?: string): IFuture; requestExecute(content: IExecuteRequest, disposeOnDone?: boolean): IFuture;
/** /**
* Send a `complete_request` message. * Send a `complete_request` message.

View File

@@ -778,7 +778,7 @@ suite('Cell Model', function (): void {
test('Execute returns error status', async function (): Promise<void> { test('Execute returns error status', async function (): Promise<void> {
mockKernel.setup(k => k.requiresConnection).returns(() => false); mockKernel.setup(k => k.requiresConnection).returns(() => false);
mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => { mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => {
let replyMsg: nb.IExecuteReplyMsg = <nb.IExecuteReplyMsg>{ let replyMsg: nb.IExecuteReplyMsg = <nb.IExecuteReplyMsg>{
content: <nb.IExecuteReply>{ content: <nb.IExecuteReply>{
execution_count: 1, execution_count: 1,
@@ -796,7 +796,7 @@ suite('Cell Model', function (): void {
test('Execute returns abort status', async function (): Promise<void> { test('Execute returns abort status', async function (): Promise<void> {
mockKernel.setup(k => k.requiresConnection).returns(() => false); mockKernel.setup(k => k.requiresConnection).returns(() => false);
mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => { mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => {
let replyMsg: nb.IExecuteReplyMsg = <nb.IExecuteReplyMsg>{ let replyMsg: nb.IExecuteReplyMsg = <nb.IExecuteReplyMsg>{
content: <nb.IExecuteReply>{ content: <nb.IExecuteReply>{
execution_count: 1, execution_count: 1,
@@ -815,7 +815,7 @@ suite('Cell Model', function (): void {
test('Execute throws exception', async function (): Promise<void> { test('Execute throws exception', async function (): Promise<void> {
let testMsg = 'Test message'; let testMsg = 'Test message';
mockKernel.setup(k => k.requiresConnection).returns(() => false); mockKernel.setup(k => k.requiresConnection).returns(() => false);
mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => { mockKernel.setup(k => k.requestExecute(TypeMoq.It.isAny(), TypeMoq.It.isAny())).returns(() => {
throw new Error(testMsg); throw new Error(testMsg);
}); });

View File

@@ -408,7 +408,7 @@ export class CellModel extends Disposable implements ICellModel {
const future = kernel.requestExecute({ const future = kernel.requestExecute({
code: content, code: content,
stop_on_error: true stop_on_error: true
}, false, this._cellUri.toString()); }, false);
this.setFuture(future as FutureInternal); this.setFuture(future as FutureInternal);
this.fireExecutionStateChanged(); this.fireExecutionStateChanged();
// For now, await future completion. Later we should just track and handle cancellation based on model notifications // For now, await future completion. Later we should just track and handle cancellation based on model notifications

View File

@@ -23,12 +23,12 @@ import { isUndefinedOrNull } from 'vs/base/common/types';
import { ILanguageMagic } from 'sql/workbench/services/notebook/browser/notebookService'; import { ILanguageMagic } from 'sql/workbench/services/notebook/browser/notebookService';
import { ITextResourcePropertiesService } from 'vs/editor/common/services/textResourceConfigurationService'; import { ITextResourcePropertiesService } from 'vs/editor/common/services/textResourceConfigurationService';
import { URI } from 'vs/base/common/uri'; import { URI } from 'vs/base/common/uri';
import { getUriPrefix, uriPrefixes } from 'sql/platform/connection/common/utils';
import { firstIndex } from 'vs/base/common/arrays'; import { firstIndex } from 'vs/base/common/arrays';
import { startsWith } from 'vs/base/common/strings'; import { startsWith } from 'vs/base/common/strings';
import { onUnexpectedError } from 'vs/base/common/errors'; import { onUnexpectedError } from 'vs/base/common/errors';
import { FutureInternal, notebookConstants } from 'sql/workbench/services/notebook/browser/interfaces'; import { FutureInternal, notebookConstants } from 'sql/workbench/services/notebook/browser/interfaces';
import { tryMatchCellMagic } from 'sql/workbench/services/notebook/browser/utils'; import { tryMatchCellMagic } from 'sql/workbench/services/notebook/browser/utils';
import { IQueryManagementService } from 'sql/workbench/services/query/common/queryManagement';
export const sqlKernelError: string = localize("sqlKernelError", "SQL kernel error"); export const sqlKernelError: string = localize("sqlKernelError", "SQL kernel error");
export const MAX_ROWS = 5000; export const MAX_ROWS = 5000;
@@ -158,7 +158,7 @@ export class SqlSession implements nb.ISession {
} }
class SqlKernel extends Disposable implements nb.IKernel { class SqlKernel extends Disposable implements nb.IKernel {
private _queryRunners: Map<string, QueryRunner> = new Map<string, QueryRunner>(); private _queryRunner: QueryRunner;
private _currentConnection: IConnectionProfile; private _currentConnection: IConnectionProfile;
private _currentConnectionProfile: ConnectionProfile; private _currentConnectionProfile: ConnectionProfile;
static kernelId: number = 0; static kernelId: number = 0;
@@ -167,6 +167,7 @@ class SqlKernel extends Disposable implements nb.IKernel {
private _future: SQLFuture; private _future: SQLFuture;
private _executionCount: number = 0; private _executionCount: number = 0;
private _magicToExecutorMap = new Map<string, ExternalScriptMagic>(); private _magicToExecutorMap = new Map<string, ExternalScriptMagic>();
private _connectionPath: string;
constructor(private _path: string, constructor(private _path: string,
@IConnectionManagementService private _connectionManagementService: IConnectionManagementService, @IConnectionManagementService private _connectionManagementService: IConnectionManagementService,
@@ -175,11 +176,11 @@ class SqlKernel extends Disposable implements nb.IKernel {
@IErrorMessageService private _errorMessageService: IErrorMessageService, @IErrorMessageService private _errorMessageService: IErrorMessageService,
@IConfigurationService private _configurationService: IConfigurationService, @IConfigurationService private _configurationService: IConfigurationService,
@ILogService private readonly logService: ILogService, @ILogService private readonly logService: ILogService,
@ITextResourcePropertiesService private readonly textResourcePropertiesService: ITextResourcePropertiesService, @ITextResourcePropertiesService private readonly textResourcePropertiesService: ITextResourcePropertiesService
@IQueryManagementService private queryManagementService: IQueryManagementService
) { ) {
super(); super();
this.initMagics(); this.initMagics();
this.setConnectionPath();
} }
private initMagics(): void { private initMagics(): void {
@@ -189,6 +190,29 @@ class SqlKernel extends Disposable implements nb.IKernel {
} }
} }
private setConnectionPath(): void {
if (this._path) {
let prefix = getUriPrefix(this._path);
if (!prefix || prefix === uriPrefixes.connection) {
this._connectionPath = uriPrefixes.notebook.concat(this._path);
} else if (prefix !== uriPrefixes.notebook) {
try {
let uri = URI.parse(this._path);
if (uri && uri.scheme) {
this._connectionPath = uri.toString().replace(uri.scheme, uriPrefixes.notebook);
}
} catch {
// Ignore exceptions from URI parsing
} finally {
// If _connectionPath hasn't been set yet, set _connectionPath to _path as a last resort
if (!this._connectionPath) {
this._connectionPath = this._path;
}
}
}
}
}
public get id(): string { public get id(): string {
if (this._id === undefined) { if (this._id === undefined) {
this._id = (SqlKernel.kernelId++).toString(); this._id = (SqlKernel.kernelId++).toString();
@@ -239,33 +263,29 @@ class SqlKernel extends Disposable implements nb.IKernel {
public set connection(conn: IConnectionProfile) { public set connection(conn: IConnectionProfile) {
this._currentConnection = conn; this._currentConnection = conn;
this._currentConnectionProfile = new ConnectionProfile(this._capabilitiesService, this._currentConnection); this._currentConnectionProfile = new ConnectionProfile(this._capabilitiesService, this._currentConnection);
this._queryRunners.clear(); this._queryRunner = undefined;
} }
getSpec(): Thenable<nb.IKernelSpec> { getSpec(): Thenable<nb.IKernelSpec> {
return Promise.resolve(notebookConstants.sqlKernelSpec); return Promise.resolve(notebookConstants.sqlKernelSpec);
} }
requestExecute(content: nb.IExecuteRequest, disposeOnDone?: boolean, cellUri?: string): nb.IFuture { requestExecute(content: nb.IExecuteRequest, disposeOnDone?: boolean): nb.IFuture {
let canRun: boolean = true; let canRun: boolean = true;
let code = this.getCodeWithoutCellMagic(content); let code = this.getCodeWithoutCellMagic(content);
let queryRunnerUri = 'queryRunner-' + cellUri; if (this._queryRunner) {
let queryRunner: QueryRunner | undefined = this._queryRunners.get(queryRunnerUri);
if (queryRunner) {
// Cancel any existing query // Cancel any existing query
if (this._future && !queryRunner.hasCompleted) { if (this._future && !this._queryRunner.hasCompleted) {
queryRunner.cancelQuery().then(ok => undefined, error => this._errorMessageService.showDialog(Severity.Error, sqlKernelError, error)); this._queryRunner.cancelQuery().then(ok => undefined, error => this._errorMessageService.showDialog(Severity.Error, sqlKernelError, error));
// TODO when we can just show error as an output, should show an "execution canceled" error in output // TODO when we can just show error as an output, should show an "execution canceled" error in output
this._future.handleDone().catch(err => onUnexpectedError(err)); this._future.handleDone().catch(err => onUnexpectedError(err));
} }
queryRunner.runQuery(code).catch(err => onUnexpectedError(err)); this._queryRunner.runQuery(code).catch(err => onUnexpectedError(err));
} else if (this._currentConnection && this._currentConnectionProfile) { } else if (this._currentConnection && this._currentConnectionProfile) {
queryRunner = this._instantiationService.createInstance(QueryRunner, queryRunnerUri); this._queryRunner = this._instantiationService.createInstance(QueryRunner, this._connectionPath);
this._queryRunners.set(queryRunnerUri, queryRunner); this._connectionManagementService.connect(this._currentConnectionProfile, this._connectionPath).then((result) => {
this.queryManagementService.registerRunner(queryRunner, queryRunnerUri); this.addQueryEventListeners(this._queryRunner);
this._connectionManagementService.connect(this._currentConnectionProfile, queryRunnerUri).then((result) => { this._queryRunner.runQuery(code).catch(err => onUnexpectedError(err));
this.addQueryEventListeners(queryRunner);
queryRunner.runQuery(code).catch(err => onUnexpectedError(err));
}).catch(err => onUnexpectedError(err)); }).catch(err => onUnexpectedError(err));
} else { } else {
canRun = false; canRun = false;
@@ -275,7 +295,7 @@ class SqlKernel extends Disposable implements nb.IKernel {
// TODO verify this is "canonical" behavior // TODO verify this is "canonical" behavior
let count = canRun ? ++this._executionCount : undefined; let count = canRun ? ++this._executionCount : undefined;
this._future = new SQLFuture(queryRunner, count, this._configurationService, this.logService); this._future = new SQLFuture(this._queryRunner, count, this._configurationService, this.logService);
if (!canRun) { if (!canRun) {
// Complete early // Complete early
this._future.handleDone(new Error(localize('connectionRequired', "A connection must be chosen to run notebook cells"))).catch(err => onUnexpectedError(err)); this._future.handleDone(new Error(localize('connectionRequired', "A connection must be chosen to run notebook cells"))).catch(err => onUnexpectedError(err));
@@ -311,8 +331,8 @@ class SqlKernel extends Disposable implements nb.IKernel {
interrupt(): Thenable<void> { interrupt(): Thenable<void> {
// TODO: figure out what to do with the QueryCancelResult // TODO: figure out what to do with the QueryCancelResult
let runners = [...this._queryRunners.values()]; return this._queryRunner.cancelQuery().then((cancelResult) => {
return Promise.all(runners.map(queryRunner => queryRunner.cancelQuery())).then(); });
} }
private addQueryEventListeners(queryRunner: QueryRunner): void { private addQueryEventListeners(queryRunner: QueryRunner): void {
@@ -329,14 +349,9 @@ class SqlKernel extends Disposable implements nb.IKernel {
} }
} }
})); }));
this._register(queryRunner.onResultSet(resultSet => {
if (this._future) {
this._future.onResultSet(resultSet);
}
}));
this._register(queryRunner.onBatchEnd(batch => { this._register(queryRunner.onBatchEnd(batch => {
if (this._future) { if (this._future) {
this._future.onBatchEnd(batch); this._future.handleBatchEnd(batch);
} }
})); }));
} }
@@ -349,16 +364,15 @@ class SqlKernel extends Disposable implements nb.IKernel {
} }
public async disconnect(): Promise<void> { public async disconnect(): Promise<void> {
this._queryRunners.forEach(async (queryRunner: QueryRunner, uri: string) => { if (this._connectionPath) {
if (this._connectionManagementService.isConnected(uri)) { if (this._connectionManagementService.isConnected(this._connectionPath)) {
try { try {
await this._connectionManagementService.disconnect(uri); await this._connectionManagementService.disconnect(this._connectionPath);
} catch (err) { } catch (err) {
this.logService.error(err); this.logService.error(err);
} }
} }
}); }
return; return;
} }
} }
@@ -370,9 +384,9 @@ export class SQLFuture extends Disposable implements FutureInternal {
private doneDeferred = new Deferred<nb.IShellMessage>(); private doneDeferred = new Deferred<nb.IShellMessage>();
private configuredMaxRows: number = MAX_ROWS; private configuredMaxRows: number = MAX_ROWS;
private _outputAddedPromises: Promise<void>[] = []; private _outputAddedPromises: Promise<void>[] = [];
private _querySubsetResultMap: Map<number, ResultSetSubset> = new Map<number, ResultSetSubset>();
private _errorOccurred: boolean = false; private _errorOccurred: boolean = false;
private _stopOnError: boolean = true; private _stopOnError: boolean = true;
constructor( constructor(
private _queryRunner: QueryRunner, private _queryRunner: QueryRunner,
private _executionCount: number | undefined, private _executionCount: number | undefined,
@@ -428,6 +442,7 @@ export class SQLFuture extends Disposable implements FutureInternal {
this.doneHandler.handle(msg); this.doneHandler.handle(msg);
} }
this.doneDeferred.resolve(msg); this.doneDeferred.resolve(msg);
this._querySubsetResultMap.clear();
} }
sendInputReply(content: nb.IInputReply): void { sendInputReply(content: nb.IInputReply): void {
@@ -458,32 +473,28 @@ export class SQLFuture extends Disposable implements FutureInternal {
} }
} }
public onResultSet(resultSet: ResultSetSummary | ResultSetSummary[]): void { public handleBatchEnd(batch: BatchSummary): void {
if (this.ioHandler) { if (this.ioHandler) {
this._outputAddedPromises.push(this.sendInitialResultSets(resultSet)); this._outputAddedPromises.push(this.processResultSets(batch));
} }
} }
public onBatchEnd(batch: BatchSummary): void { private async processResultSets(batch: BatchSummary): Promise<void> {
if (this.ioHandler) {
for (let set of batch.resultSetSummaries) {
if (set.rowCount > this.configuredMaxRows) {
this.handleMessage(localize('sqlMaxRowsDisplayed', "Displaying Top {0} rows.", this.configuredMaxRows));
}
}
}
}
private async sendInitialResultSets(resultSet: ResultSetSummary | ResultSetSummary[]): Promise<void> {
try { try {
let resultsToAdd: ResultSetSummary[]; let queryRowsPromises: Promise<void>[] = [];
if (!Array.isArray(resultSet)) { for (let resultSet of batch.resultSetSummaries) {
resultsToAdd = [resultSet]; let rowCount = resultSet.rowCount > this.configuredMaxRows ? this.configuredMaxRows : resultSet.rowCount;
} else { if (rowCount === this.configuredMaxRows) {
resultsToAdd = resultSet?.splice(0); this.handleMessage(localize('sqlMaxRowsDisplayed', "Displaying Top {0} rows.", rowCount));
}
queryRowsPromises.push(this.getAllQueryRows(rowCount, resultSet));
} }
for (let set of resultsToAdd) { // We want to display table in the same order
this.sendIOPubMessage(set, false); let i = 0;
for (let resultSet of batch.resultSetSummaries) {
await queryRowsPromises[i];
this.sendResultSetAsIOPub(resultSet);
i++;
} }
} catch (err) { } catch (err) {
// TODO should we output this somewhere else? // TODO should we output this somewhere else?
@@ -491,7 +502,31 @@ export class SQLFuture extends Disposable implements FutureInternal {
} }
} }
private sendIOPubMessage(resultSet: ResultSetSummary, conversionComplete?: boolean, subsetResult?: ResultSetSubset): void { private async getAllQueryRows(rowCount: number, resultSet: ResultSetSummary): Promise<void> {
let deferred: Deferred<void> = new Deferred<void>();
if (rowCount > 0) {
this._queryRunner.getQueryRows(0, rowCount, resultSet.batchId, resultSet.id).then((result) => {
this._querySubsetResultMap.set(resultSet.id, result);
deferred.resolve();
}, (err) => {
this._querySubsetResultMap.set(resultSet.id, { rowCount: 0, rows: [] });
deferred.reject(err);
});
} else {
this._querySubsetResultMap.set(resultSet.id, { rowCount: 0, rows: [] });
deferred.resolve();
}
return deferred;
}
private sendResultSetAsIOPub(resultSet: ResultSetSummary): void {
if (this._querySubsetResultMap && this._querySubsetResultMap.get(resultSet.id)) {
let subsetResult = this._querySubsetResultMap.get(resultSet.id);
this.sendIOPubMessage(subsetResult, resultSet);
}
}
private sendIOPubMessage(subsetResult: ResultSetSubset, resultSet: ResultSetSummary): void {
let msg: nb.IIOPubMessage = { let msg: nb.IIOPubMessage = {
channel: 'iopub', channel: 'iopub',
type: 'iopub', type: 'iopub',
@@ -503,21 +538,16 @@ export class SQLFuture extends Disposable implements FutureInternal {
output_type: 'execute_result', output_type: 'execute_result',
metadata: {}, metadata: {},
execution_count: this._executionCount, execution_count: this._executionCount,
// Initial data sent to notebook only contains column headers since
// onResultSet only returns the column info (and no row data).
// Row data conversion will be handled in DataResourceDataProvider
data: { data: {
'application/vnd.dataresource+json': this.convertToDataResource(resultSet.columnInfo), 'application/vnd.dataresource+json': this.convertToDataResource(resultSet.columnInfo, subsetResult),
'text/html': this.convertToHtmlTable(resultSet.columnInfo) 'text/html': this.convertToHtmlTable(resultSet.columnInfo, subsetResult)
}, }
batchId: resultSet.batchId,
id: resultSet.id,
queryRunnerUri: this._queryRunner.uri,
}, },
metadata: undefined, metadata: undefined,
parent_header: undefined parent_header: undefined
}; };
this.ioHandler.handle(msg); this.ioHandler.handle(msg);
this._querySubsetResultMap.delete(resultSet.id);
} }
setIOPubHandler(handler: nb.MessageHandler<nb.IIOPubMessage>): void { setIOPubHandler(handler: nb.MessageHandler<nb.IIOPubMessage>): void {
@@ -531,31 +561,49 @@ export class SQLFuture extends Disposable implements FutureInternal {
// no-op // no-op
} }
private convertToDataResource(columns: IColumn[]): IDataResource { private convertToDataResource(columns: IColumn[], subsetResult: ResultSetSubset): IDataResource {
let columnsResources: IDataResourceSchema[] = []; let columnsResources: IDataResourceSchema[] = [];
columns.forEach(column => { columns.forEach(column => {
columnsResources.push({ name: escape(column.columnName) }); columnsResources.push({ name: escape(column.columnName) });
}); });
let columnsFields: IDataResourceFields = { fields: columnsResources }; let columnsFields: IDataResourceFields = { fields: undefined };
columnsFields.fields = columnsResources;
return { return {
schema: columnsFields, schema: columnsFields,
data: [] data: subsetResult.rows.map(row => {
let rowObject: { [key: string]: any; } = {};
row.forEach((val, index) => {
rowObject[index] = val.displayValue;
});
return rowObject;
})
}; };
} }
private convertToHtmlTable(columns: IColumn[]): string[] { private convertToHtmlTable(columns: IColumn[], d: ResultSetSubset): string[] {
let htmlTable: string[] = new Array(3); // Adding 3 for <table>, column title rows, </table>
htmlTable[0] = '<table>'; let htmlStringArr: string[] = new Array(d.rowCount + 3);
htmlStringArr[0] = '<table>';
if (columns.length > 0) { if (columns.length > 0) {
let columnHeaders = '<tr>'; let columnHeaders = '<tr>';
for (let column of columns) { for (let column of columns) {
columnHeaders += `<th>${escape(column.columnName)}</th>`; columnHeaders += `<th>${escape(column.columnName)}</th>`;
} }
columnHeaders += '</tr>'; columnHeaders += '</tr>';
htmlTable[1] = columnHeaders; htmlStringArr[1] = columnHeaders;
} }
htmlTable[2] = '</table>'; let i = 2;
return htmlTable; for (const row of d.rows) {
let rowData = '<tr>';
for (let columnIndex = 0; columnIndex < columns.length; columnIndex++) {
rowData += `<td>${escape(row[columnIndex].displayValue)}</td>`;
}
rowData += '</tr>';
htmlStringArr[i] = rowData;
i++;
}
htmlStringArr[htmlStringArr.length - 1] = '</table>';
return htmlStringArr;
} }
private convertToDisplayMessage(msg: IResultMessage | string): nb.IIOPubMessage { private convertToDisplayMessage(msg: IResultMessage | string): nb.IIOPubMessage {