Add grid streaming support for notebooks (#12175)

* add onResultUpdate handler in gridoutput

* convert rows to mimetype and html

* wait for data conversion to finish before saving

* detach changeRef after output is created

* fix save grid action

* move data conversion check to each cell

* move conversion logic to dataprovider

* notify data converting when user saves

* add comments and remove unused methods

* fix method return type

* fix tests

* fix convertData method header

* move azdata changes to azdata proposed

* address PR comments

* display top rows message

* fix messages/table ordering and query 100 rows

* add missing escape import

* set default max rows to 5000

* add undefined check to updateResultSet

* change gridDataConversionComplete return type
This commit is contained in:
Lucy Zhang
2020-09-10 13:31:40 -07:00
committed by GitHub
parent 1528c642d1
commit e3ec6bf9c5
20 changed files with 400 additions and 132 deletions

View File

@@ -64,6 +64,7 @@ export class CellModel extends Disposable implements ICellModel {
private _showPreview: boolean = true;
private _onCellPreviewChanged = new Emitter<boolean>();
private _isCommandExecutionSettingEnabled: boolean = false;
private _gridDataConversionComplete: Promise<void>[] = [];
constructor(cellData: nb.ICellContents,
private _options: ICellModelOptions,
@@ -338,6 +339,8 @@ export class CellModel extends Disposable implements ICellModel {
public async runCell(notificationService?: INotificationService, connectionManagementService?: IConnectionManagementService): Promise<boolean> {
try {
// Clear grid data conversion promises from previous execution results
this._gridDataConversionComplete = [];
if (!this.active && this !== this.notebookModel.activeCell) {
this.notebookModel.updateActiveCell(this);
this.active = true;
@@ -523,6 +526,25 @@ export class CellModel extends Disposable implements ICellModel {
return this._outputs;
}
public updateOutputData(batchId: number, id: number, data: any) {
for (let i = 0; i < this._outputs.length; i++) {
if (this._outputs[i].output_type === 'execute_result'
&& (<nb.IExecuteResult>this._outputs[i]).batchId === batchId
&& (<nb.IExecuteResult>this._outputs[i]).id === id) {
(<nb.IExecuteResult>this._outputs[i]).data = data;
break;
}
}
}
public get gridDataConversionComplete(): Promise<void> {
return Promise.all(this._gridDataConversionComplete).then();
}
public addGridDataConversionPromise(complete: Promise<void>): void {
this._gridDataConversionComplete.push(complete);
}
public get renderedOutputTextContent(): string[] {
return this._renderedOutputTextContent;
}
@@ -579,7 +601,22 @@ export class CellModel extends Disposable implements ICellModel {
if (output) {
// deletes transient node in the serialized JSON
delete output['transient'];
this._outputs.push(this.rewriteOutputUrls(output));
// display message outputs before grid outputs
if (output.output_type === 'display_data' && this._outputs.length > 0) {
let added = false;
for (let i = 0; i < this._outputs.length; i++) {
if (this._outputs[i].output_type === 'execute_result') {
this._outputs.splice(i, 0, this.rewriteOutputUrls(output));
added = true;
break;
}
}
if (!added) {
this._outputs.push(this.rewriteOutputUrls(output));
}
} else {
this._outputs.push(this.rewriteOutputUrls(output));
}
// Only scroll on 1st output being added
let shouldScroll = this._outputs.length === 1;
this.fireOutputsChanged(shouldScroll);

View File

@@ -237,6 +237,10 @@ export interface INotebookModel {
* Promise indicating when client session is ready to use.
*/
readonly sessionLoadFinished: Promise<void>;
/**
* Promise indicating when output grid data is converted to mimeType and html.
*/
gridDataConversionComplete: Promise<any>;
/**
* LanguageInfo saved in the notebook
*/
@@ -480,6 +484,9 @@ export interface ICellModel {
showPreview: boolean;
readonly onCellPreviewChanged: Event<boolean>;
sendChangeToNotebook(change: NotebookChangeType): void;
gridDataConversionComplete: Promise<void>;
addGridDataConversionPromise(complete: Promise<void>): void;
updateOutputData(batchId: number, id: number, data: any): void;
}
export interface IModelFactory {

View File

@@ -271,6 +271,17 @@ export class NotebookModel extends Disposable implements INotebookModel {
return this._sessionLoadFinished;
}
/**
* Indicates all result grid output has been converted to mimeType and html.
*/
public get gridDataConversionComplete(): Promise<any> {
let promises = [];
for (let cell of this._cells) {
promises.push(cell.gridDataConversionComplete);
}
return Promise.all(promises);
}
/**
* Notifies when the client session is ready for use
*/

View File

@@ -6,7 +6,7 @@
import { nb, IResultMessage } from 'azdata';
import { localize } from 'vs/nls';
import QueryRunner from 'sql/workbench/services/query/common/queryRunner';
import { BatchSummary, ResultSetSummary, IColumn, ResultSetSubset } from 'sql/workbench/services/query/common/query';
import { ResultSetSummary, ResultSetSubset, IColumn, BatchSummary } from 'sql/workbench/services/query/common/query';
import { IConnectionManagementService } from 'sql/platform/connection/common/connectionManagement';
import { IInstantiationService } from 'vs/platform/instantiation/common/instantiation';
import Severity from 'vs/base/common/severity';
@@ -14,8 +14,8 @@ import { Deferred } from 'sql/base/common/promise';
import { Disposable } from 'vs/base/common/lifecycle';
import { IErrorMessageService } from 'sql/platform/errorMessage/common/errorMessageService';
import { ConnectionProfile } from 'sql/platform/connection/common/connectionProfile';
import { IConnectionProfile } from 'sql/platform/connection/common/interfaces';
import { escape } from 'sql/base/common/strings';
import { IConnectionProfile } from 'sql/platform/connection/common/interfaces';
import { IConfigurationService } from 'vs/platform/configuration/common/configuration';
import { ICapabilitiesService } from 'sql/platform/capabilities/common/capabilitiesService';
import { ILogService } from 'vs/platform/log/common/log';
@@ -29,6 +29,7 @@ import { startsWith } from 'vs/base/common/strings';
import { onUnexpectedError } from 'vs/base/common/errors';
import { FutureInternal, notebookConstants } from 'sql/workbench/services/notebook/browser/interfaces';
import { tryMatchCellMagic } from 'sql/workbench/services/notebook/browser/utils';
import { IQueryManagementService } from 'sql/workbench/services/query/common/queryManagement';
export const sqlKernelError: string = localize("sqlKernelError", "SQL kernel error");
export const MAX_ROWS = 5000;
@@ -176,7 +177,8 @@ class SqlKernel extends Disposable implements nb.IKernel {
@IErrorMessageService private _errorMessageService: IErrorMessageService,
@IConfigurationService private _configurationService: IConfigurationService,
@ILogService private readonly logService: ILogService,
@ITextResourcePropertiesService private readonly textResourcePropertiesService: ITextResourcePropertiesService
@ITextResourcePropertiesService private readonly textResourcePropertiesService: ITextResourcePropertiesService,
@IQueryManagementService private queryManagementService: IQueryManagementService
) {
super();
this.initMagics();
@@ -283,6 +285,7 @@ class SqlKernel extends Disposable implements nb.IKernel {
this._queryRunner.runQuery(code).catch(err => onUnexpectedError(err));
} else if (this._currentConnection && this._currentConnectionProfile) {
this._queryRunner = this._instantiationService.createInstance(QueryRunner, this._connectionPath);
this.queryManagementService.registerRunner(this._queryRunner, this._connectionPath);
this._connectionManagementService.connect(this._currentConnectionProfile, this._connectionPath).then((result) => {
this.addQueryEventListeners(this._queryRunner);
this._queryRunner.runQuery(code).catch(err => onUnexpectedError(err));
@@ -349,9 +352,14 @@ class SqlKernel extends Disposable implements nb.IKernel {
}
}
}));
this._register(queryRunner.onResultSet(resultSet => {
if (this._future) {
this._future.onResultSet(resultSet);
}
}));
this._register(queryRunner.onBatchEnd(batch => {
if (this._future) {
this._future.handleBatchEnd(batch);
this._future.onBatchEnd(batch);
}
}));
}
@@ -384,9 +392,9 @@ export class SQLFuture extends Disposable implements FutureInternal {
private doneDeferred = new Deferred<nb.IShellMessage>();
private configuredMaxRows: number = MAX_ROWS;
private _outputAddedPromises: Promise<void>[] = [];
private _querySubsetResultMap: Map<number, ResultSetSubset> = new Map<number, ResultSetSubset>();
private _errorOccurred: boolean = false;
private _stopOnError: boolean = true;
constructor(
private _queryRunner: QueryRunner,
private _executionCount: number | undefined,
@@ -442,7 +450,6 @@ export class SQLFuture extends Disposable implements FutureInternal {
this.doneHandler.handle(msg);
}
this.doneDeferred.resolve(msg);
this._querySubsetResultMap.clear();
}
sendInputReply(content: nb.IInputReply): void {
@@ -473,28 +480,32 @@ export class SQLFuture extends Disposable implements FutureInternal {
}
}
public handleBatchEnd(batch: BatchSummary): void {
public onResultSet(resultSet: ResultSetSummary | ResultSetSummary[]): void {
if (this.ioHandler) {
this._outputAddedPromises.push(this.processResultSets(batch));
this._outputAddedPromises.push(this.sendInitialResultSets(resultSet));
}
}
private async processResultSets(batch: BatchSummary): Promise<void> {
try {
let queryRowsPromises: Promise<void>[] = [];
for (let resultSet of batch.resultSetSummaries) {
let rowCount = resultSet.rowCount > this.configuredMaxRows ? this.configuredMaxRows : resultSet.rowCount;
if (rowCount === this.configuredMaxRows) {
this.handleMessage(localize('sqlMaxRowsDisplayed', "Displaying Top {0} rows.", rowCount));
public onBatchEnd(batch: BatchSummary): void {
if (this.ioHandler) {
for (let set of batch.resultSetSummaries) {
if (set.rowCount > this.configuredMaxRows) {
this.handleMessage(localize('sqlMaxRowsDisplayed', "Displaying Top {0} rows.", this.configuredMaxRows));
}
queryRowsPromises.push(this.getAllQueryRows(rowCount, resultSet));
}
// We want to display table in the same order
let i = 0;
for (let resultSet of batch.resultSetSummaries) {
await queryRowsPromises[i];
this.sendResultSetAsIOPub(resultSet);
i++;
}
}
private async sendInitialResultSets(resultSet: ResultSetSummary | ResultSetSummary[]): Promise<void> {
try {
let resultsToAdd: ResultSetSummary[];
if (!Array.isArray(resultSet)) {
resultsToAdd = [resultSet];
} else {
resultsToAdd = resultSet?.splice(0);
}
for (let set of resultsToAdd) {
this.sendIOPubMessage(set, false);
}
} catch (err) {
// TODO should we output this somewhere else?
@@ -502,31 +513,7 @@ export class SQLFuture extends Disposable implements FutureInternal {
}
}
private async getAllQueryRows(rowCount: number, resultSet: ResultSetSummary): Promise<void> {
let deferred: Deferred<void> = new Deferred<void>();
if (rowCount > 0) {
this._queryRunner.getQueryRows(0, rowCount, resultSet.batchId, resultSet.id).then((result) => {
this._querySubsetResultMap.set(resultSet.id, result);
deferred.resolve();
}, (err) => {
this._querySubsetResultMap.set(resultSet.id, { rowCount: 0, rows: [] });
deferred.reject(err);
});
} else {
this._querySubsetResultMap.set(resultSet.id, { rowCount: 0, rows: [] });
deferred.resolve();
}
return deferred;
}
private sendResultSetAsIOPub(resultSet: ResultSetSummary): void {
if (this._querySubsetResultMap && this._querySubsetResultMap.get(resultSet.id)) {
let subsetResult = this._querySubsetResultMap.get(resultSet.id);
this.sendIOPubMessage(subsetResult, resultSet);
}
}
private sendIOPubMessage(subsetResult: ResultSetSubset, resultSet: ResultSetSummary): void {
private sendIOPubMessage(resultSet: ResultSetSummary, conversionComplete?: boolean, subsetResult?: ResultSetSubset): void {
let msg: nb.IIOPubMessage = {
channel: 'iopub',
type: 'iopub',
@@ -538,16 +525,21 @@ export class SQLFuture extends Disposable implements FutureInternal {
output_type: 'execute_result',
metadata: {},
execution_count: this._executionCount,
// Initial data sent to notebook only contains column headers since
// onResultSet only returns the column info (and no row data).
// Row data conversion will be handled in DataResourceDataProvider
data: {
'application/vnd.dataresource+json': this.convertToDataResource(resultSet.columnInfo, subsetResult),
'text/html': this.convertToHtmlTable(resultSet.columnInfo, subsetResult)
}
'application/vnd.dataresource+json': this.convertToDataResource(resultSet.columnInfo),
'text/html': this.convertToHtmlTable(resultSet.columnInfo)
},
batchId: resultSet.batchId,
id: resultSet.id,
queryRunnerUri: this._queryRunner.uri,
},
metadata: undefined,
parent_header: undefined
};
this.ioHandler.handle(msg);
this._querySubsetResultMap.delete(resultSet.id);
}
setIOPubHandler(handler: nb.MessageHandler<nb.IIOPubMessage>): void {
@@ -561,49 +553,31 @@ export class SQLFuture extends Disposable implements FutureInternal {
// no-op
}
private convertToDataResource(columns: IColumn[], subsetResult: ResultSetSubset): IDataResource {
private convertToDataResource(columns: IColumn[]): IDataResource {
let columnsResources: IDataResourceSchema[] = [];
columns.forEach(column => {
columnsResources.push({ name: escape(column.columnName) });
});
let columnsFields: IDataResourceFields = { fields: undefined };
columnsFields.fields = columnsResources;
let columnsFields: IDataResourceFields = { fields: columnsResources };
return {
schema: columnsFields,
data: subsetResult.rows.map(row => {
let rowObject: { [key: string]: any; } = {};
row.forEach((val, index) => {
rowObject[index] = val.displayValue;
});
return rowObject;
})
data: []
};
}
private convertToHtmlTable(columns: IColumn[], d: ResultSetSubset): string[] {
// Adding 3 for <table>, column title rows, </table>
let htmlStringArr: string[] = new Array(d.rowCount + 3);
htmlStringArr[0] = '<table>';
private convertToHtmlTable(columns: IColumn[]): string[] {
let htmlTable: string[] = new Array(3);
htmlTable[0] = '<table>';
if (columns.length > 0) {
let columnHeaders = '<tr>';
for (let column of columns) {
columnHeaders += `<th>${escape(column.columnName)}</th>`;
}
columnHeaders += '</tr>';
htmlStringArr[1] = columnHeaders;
htmlTable[1] = columnHeaders;
}
let i = 2;
for (const row of d.rows) {
let rowData = '<tr>';
for (let columnIndex = 0; columnIndex < columns.length; columnIndex++) {
rowData += `<td>${escape(row[columnIndex].displayValue)}</td>`;
}
rowData += '</tr>';
htmlStringArr[i] = rowData;
i++;
}
htmlStringArr[htmlStringArr.length - 1] = '</table>';
return htmlStringArr;
htmlTable[2] = '</table>';
return htmlTable;
}
private convertToDisplayMessage(msg: IResultMessage | string): nb.IIOPubMessage {