Spark features with dashboard are enabled (#3883)

* Spark features are enabled

* Fixed as PR comments

* minor change

* PR comments fixed

* minor fix

* change constant name to avoid conflicts with sqlopsextension

* sqlContext to context

* Changed tab name to SQL Server Big Data Cluster

* Added isCluster to ContextProvider to control display big data cluster dashboard tab
Ported New/open Notebook code to mssql extension and enable them in dashboard

* Fixed tslint
This commit is contained in:
Gene Lee
2019-02-06 11:54:25 -08:00
committed by Yurong He
parent 327a5f5fae
commit 8b9ce3e8de
36 changed files with 2090 additions and 477 deletions

View File

@@ -0,0 +1,140 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the Source EULA. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
import * as sqlops from 'sqlops';
import * as nls from 'vscode-nls';
import * as vscode from 'vscode';
const localize = nls.loadMessageBundle();
import { ICommandViewContext, Command, ICommandObjectExplorerContext, ICommandUnknownContext } from '../../objectExplorerNodeProvider/command';
import { SparkJobSubmissionDialog } from './sparkJobSubmission/sparkJobSubmissionDialog';
import { AppContext } from '../../appContext';
import { getErrorMessage } from '../../utils';
import * as constants from '../../constants';
import { HdfsFileSourceNode } from '../../objectExplorerNodeProvider/hdfsProvider';
import { getNode } from '../../objectExplorerNodeProvider/hdfsCommands';
import * as LocalizedConstants from '../../localizedConstants';
import * as SqlClusterLookUp from '../../sqlClusterLookUp';
import { SqlClusterConnection } from '../../objectExplorerNodeProvider/connection';
export class OpenSparkJobSubmissionDialogCommand extends Command {
constructor(appContext: AppContext, private outputChannel: vscode.OutputChannel) {
super(constants.mssqlClusterLivySubmitSparkJobCommand, appContext);
}
protected async preExecute(context: ICommandUnknownContext | ICommandObjectExplorerContext, args: object = {}): Promise<any> {
return this.execute(context, args);
}
async execute(context: ICommandUnknownContext | ICommandObjectExplorerContext, ...args: any[]): Promise<void> {
try {
let sqlClusterConnection: SqlClusterConnection = undefined;
if (context.type === constants.ObjectExplorerService) {
sqlClusterConnection = SqlClusterLookUp.findSqlClusterConnection(context, this.appContext);
}
if (!sqlClusterConnection) {
sqlClusterConnection = await this.selectConnection();
}
let dialog = new SparkJobSubmissionDialog(sqlClusterConnection, this.appContext, this.outputChannel);
await dialog.openDialog();
} catch (error) {
this.appContext.apiWrapper.showErrorMessage(getErrorMessage(error));
}
}
private async selectConnection(): Promise<SqlClusterConnection> {
let connectionList: sqlops.connection.Connection[] = await this.apiWrapper.getActiveConnections();
let displayList: string[] = new Array();
let connectionMap: Map<string, sqlops.connection.Connection> = new Map();
if (connectionList && connectionList.length > 0) {
connectionList.forEach(conn => {
if (conn.providerName === constants.sqlProviderName) {
displayList.push(conn.options.host);
connectionMap.set(conn.options.host, conn);
}
});
}
let selectedHost: string = await vscode.window.showQuickPick(displayList, {
placeHolder:
localize('sparkJobSubmission_PleaseSelectSqlWithCluster',
'Please select SQL Server with Big Data Cluster. ')
});
let errorMsg = localize('sparkJobSubmission_NoSqlSelected', 'No Sql Server is selected.');
if (!selectedHost) { throw new Error(errorMsg); }
let sqlConnection = connectionMap.get(selectedHost);
if (!sqlConnection) { throw new Error(errorMsg); }
let sqlClusterConnection = await SqlClusterLookUp.getSqlClusterConnection(sqlConnection);
if (!sqlClusterConnection) {
throw new Error(LocalizedConstants.sparkJobSubmissionNoSqlBigDataClusterFound);
}
return new SqlClusterConnection(sqlClusterConnection);
}
}
// Open the submission dialog for a specific file path.
export class OpenSparkJobSubmissionDialogFromFileCommand extends Command {
constructor(appContext: AppContext, private outputChannel: vscode.OutputChannel) {
super(constants.mssqlClusterLivySubmitSparkJobFromFileCommand, appContext);
}
protected async preExecute(context: ICommandViewContext | ICommandObjectExplorerContext, args: object = {}): Promise<any> {
return this.execute(context, args);
}
async execute(context: ICommandViewContext | ICommandObjectExplorerContext, ...args: any[]): Promise<void> {
let path: string = undefined;
try {
let node = await getNode<HdfsFileSourceNode>(context, this.appContext);
if (node && node.hdfsPath) {
path = node.hdfsPath;
} else {
this.apiWrapper.showErrorMessage(LocalizedConstants.msgMissingNodeContext);
return;
}
} catch (err) {
this.apiWrapper.showErrorMessage(localize('sparkJobSubmission_GetFilePathFromSelectedNodeFailed', 'Error Get File Path: {0}', err));
return;
}
try {
let sqlClusterConnection: SqlClusterConnection = undefined;
if (context.type === constants.ObjectExplorerService) {
sqlClusterConnection = await SqlClusterLookUp.findSqlClusterConnection(context, this.appContext);
}
if (!sqlClusterConnection) {
throw new Error(LocalizedConstants.sparkJobSubmissionNoSqlBigDataClusterFound);
}
let dialog = new SparkJobSubmissionDialog(sqlClusterConnection, this.appContext, this.outputChannel);
await dialog.openDialog(path);
} catch (error) {
this.appContext.apiWrapper.showErrorMessage(getErrorMessage(error));
}
}
}
export class OpenSparkJobSubmissionDialogTask {
constructor(private appContext: AppContext, private outputChannel: vscode.OutputChannel) {
}
async execute(profile: sqlops.IConnectionProfile, ...args: any[]): Promise<void> {
try {
let sqlClusterConnection = SqlClusterLookUp.findSqlClusterConnection(profile, this.appContext);
if (!sqlClusterConnection) {
throw new Error(LocalizedConstants.sparkJobSubmissionNoSqlBigDataClusterFound);
}
let dialog = new SparkJobSubmissionDialog(sqlClusterConnection, this.appContext, this.outputChannel);
await dialog.openDialog();
} catch (error) {
this.appContext.apiWrapper.showErrorMessage(getErrorMessage(error));
}
}
}

View File

@@ -0,0 +1,81 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the Source EULA. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
import * as sqlops from 'sqlops';
import * as nls from 'vscode-nls';
const localize = nls.loadMessageBundle();
import { SparkJobSubmissionModel } from './sparkJobSubmissionModel';
import { AppContext } from '../../../appContext';
import { ApiWrapper } from '../../../apiWrapper';
export class SparkAdvancedTab {
private _tab: sqlops.window.modelviewdialog.DialogTab;
public get tab(): sqlops.window.modelviewdialog.DialogTab { return this._tab; }
private _referenceFilesInputBox: sqlops.InputBoxComponent;
private _referenceJARFilesInputBox: sqlops.InputBoxComponent;
private _referencePyFilesInputBox: sqlops.InputBoxComponent;
private get apiWrapper(): ApiWrapper {
return this.appContext.apiWrapper;
}
constructor(private appContext: AppContext) {
this._tab = this.apiWrapper.createTab(localize('sparkJobSubmission_AdvancedTabName', 'ADVANCED'));
this._tab.registerContent(async (modelView) => {
let builder = modelView.modelBuilder;
let parentLayout: sqlops.FormItemLayout = {
horizontal: false,
componentWidth: '400px'
};
let formContainer = builder.formContainer();
this._referenceJARFilesInputBox = builder.inputBox().component();
formContainer.addFormItem({
component: this._referenceJARFilesInputBox,
title: localize('sparkJobSubmission_ReferenceJarList', 'Reference Jars')
},
Object.assign(
{
info: localize('sparkJobSubmission_ReferenceJarListToolTip',
'Jars to be placed in executor working directory. The Jar path needs to be an HDFS Path. Multiple paths should be split by semicolon (;)')
},
parentLayout));
this._referencePyFilesInputBox = builder.inputBox().component();
formContainer.addFormItem({
component: this._referencePyFilesInputBox,
title: localize('sparkJobSubmission_ReferencePyList', 'Reference py Files')
},
Object.assign(
{
info: localize('sparkJobSubmission_ReferencePyListTooltip',
'Py Files to be placed in executor working directory. The file path needs to be an HDFS Path. Multiple paths should be split by semicolon(;)')
},
parentLayout));
this._referenceFilesInputBox = builder.inputBox().component();
formContainer.addFormItem({
component: this._referenceFilesInputBox,
title: localize('sparkJobSubmission_ReferenceFilesList', 'Reference Files')
},
Object.assign({
info: localize('sparkJobSubmission_ReferenceFilesListTooltip',
'Files to be placed in executor working directory. The file path needs to be an HDFS Path. Multiple paths should be split by semicolon(;)')
}, parentLayout));
await modelView.initializeModel(formContainer.component());
});
}
public getInputValues(): string[] {
return [this._referenceJARFilesInputBox.value, this._referencePyFilesInputBox.value, this._referenceFilesInputBox.value];
}
}

View File

@@ -0,0 +1,280 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the Source EULA. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
import * as sqlops from 'sqlops';
import * as nls from 'vscode-nls';
import * as fspath from 'path';
import * as fs from 'fs';
import * as vscode from 'vscode';
import * as utils from '../../../utils';
import * as LocalizedConstants from '../../../localizedConstants';
import * as constants from '../../../constants';
import { AppContext } from '../../../appContext';
import { ApiWrapper } from '../../../apiWrapper';
import { SparkJobSubmissionModel } from './sparkJobSubmissionModel';
import { SparkFileSource } from './sparkJobSubmissionService';
const localize = nls.loadMessageBundle();
export class SparkConfigurationTab {
private _tab: sqlops.window.modelviewdialog.DialogTab;
public get tab(): sqlops.window.modelviewdialog.DialogTab { return this._tab; }
private _jobNameInputBox: sqlops.InputBoxComponent;
private _sparkContextLabel: sqlops.TextComponent;
private _fileSourceDropDown: sqlops.DropDownComponent;
private _sparkSourceFileInputBox: sqlops.InputBoxComponent;
private _filePickerButton: sqlops.ButtonComponent;
private _sourceFlexContainer: sqlops.FlexContainer;
private _sourceFlexContainerWithHint: sqlops.FlexContainer;
private _localUploadDestinationLabel: sqlops.TextComponent;
private _mainClassInputBox: sqlops.InputBoxComponent;
private _argumentsInputBox: sqlops.InputBoxComponent;
private get apiWrapper(): ApiWrapper {
return this.appContext.apiWrapper;
}
// If path is specified, means the default source setting for this tab is HDFS file, otherwise, it would be local file.
constructor(private _dataModel: SparkJobSubmissionModel, private appContext: AppContext, private _path?: string) {
this._tab = this.apiWrapper.createTab(localize('sparkJobSubmission_GeneralTabName', 'GENERAL'));
this._tab.registerContent(async (modelView) => {
let builder = modelView.modelBuilder;
let parentLayout: sqlops.FormItemLayout = {
horizontal: false,
componentWidth: '400px'
};
let formContainer = builder.formContainer();
this._jobNameInputBox = builder.inputBox().withProperties({
placeHolder: localize('sparkJobSubmission_JobNamePlaceHolder', 'Enter a name ...'),
value: (this._path) ? fspath.basename(this._path) : ''
}).component();
formContainer.addFormItem({
component: this._jobNameInputBox,
title: localize('sparkJobSubmission_JobName', 'Job Name'),
required: true
}, parentLayout);
this._sparkContextLabel = builder.text().withProperties({
value: this._dataModel.getSparkClusterUrl()
}).component();
formContainer.addFormItem({
component: this._sparkContextLabel,
title: localize('sparkJobSubmission_SparkCluster', 'Spark Cluster')
}, parentLayout);
this._fileSourceDropDown = builder.dropDown().withProperties<sqlops.DropDownProperties>({
values: [SparkFileSource.Local.toString(), SparkFileSource.HDFS.toString()],
value: (this._path) ? SparkFileSource.HDFS.toString() : SparkFileSource.Local.toString()
}).component();
this._fileSourceDropDown.onValueChanged(selection => {
let isLocal = selection.selected === SparkFileSource.Local.toString();
// Disable browser button for cloud source.
if (this._filePickerButton) {
this._filePickerButton.updateProperties({
enabled: isLocal,
required: isLocal
});
}
// Clear the path When switching source.
if (this._sparkSourceFileInputBox) {
this._sparkSourceFileInputBox.value = '';
}
if (this._localUploadDestinationLabel) {
if (isLocal) {
this._localUploadDestinationLabel.value = LocalizedConstants.sparkLocalFileDestinationHint;
} else {
this._localUploadDestinationLabel.value = '';
}
}
});
this._sparkSourceFileInputBox = builder.inputBox().withProperties({
required: true,
placeHolder: localize('sparkJobSubmission_FilePathPlaceHolder', 'Path to a .jar or .py file'),
value: (this._path) ? this._path : ''
}).component();
this._sparkSourceFileInputBox.onTextChanged(text => {
if (this._fileSourceDropDown.value === SparkFileSource.Local.toString()) {
this._dataModel.updateModelByLocalPath(text);
if (this._localUploadDestinationLabel) {
if (text) {
this._localUploadDestinationLabel.value = localize('sparkJobSubmission_LocalFileDestinationHintWithPath',
'The selected local file will be uploaded to HDFS: {0}', this._dataModel.hdfsSubmitFilePath);
} else {
this._localUploadDestinationLabel.value = LocalizedConstants.sparkLocalFileDestinationHint;
}
}
} else {
this._dataModel.hdfsSubmitFilePath = text;
}
// main class disable/enable is according to whether it's jar file.
let isJarFile = this._dataModel.isJarFile();
this._mainClassInputBox.updateProperties({ enabled: isJarFile, required: isJarFile });
if (!isJarFile) {
// Clear main class for py file.
this._mainClassInputBox.value = '';
}
});
this._filePickerButton = builder.button().withProperties({
required: (this._path) ? false : true,
enabled: (this._path) ? false : true,
label: '•••',
width: constants.mssqlClusterSparkJobFileSelectorButtonWidth,
height: constants.mssqlClusterSparkJobFileSelectorButtonHeight
}).component();
this._filePickerButton.onDidClick(() => this.onSelectFile());
this._sourceFlexContainer = builder.flexContainer().component();
this._sourceFlexContainer.addItem(this._fileSourceDropDown, { flex: '0 0 auto', CSSStyles: { 'minWidth': '75px', 'marginBottom': '5px', 'paddingRight': '3px' } });
this._sourceFlexContainer.addItem(this._sparkSourceFileInputBox, { flex: '1 1 auto', CSSStyles: { 'marginBottom': '5px', 'paddingRight': '3px' } });
// Do not add margin for file picker button as the label forces it to have 5px margin
this._sourceFlexContainer.addItem(this._filePickerButton, { flex: '0 0 auto' });
this._sourceFlexContainer.setLayout({
flexFlow: 'row',
height: '100%',
justifyContent: 'center',
alignItems: 'center',
alignContent: 'stretch'
});
this._localUploadDestinationLabel = builder.text().withProperties({
value: (this._path) ? '' : LocalizedConstants.sparkLocalFileDestinationHint
}).component();
this._sourceFlexContainerWithHint = builder.flexContainer().component();
this._sourceFlexContainerWithHint.addItem(this._sourceFlexContainer, { flex: '0 0 auto' });
this._sourceFlexContainerWithHint.addItem(this._localUploadDestinationLabel, { flex: '1 1 auto' });
this._sourceFlexContainerWithHint.setLayout({
flexFlow: 'column',
width: '100%',
justifyContent: 'center',
alignItems: 'stretch',
alignContent: 'stretch'
});
formContainer.addFormItem({
component: this._sourceFlexContainerWithHint,
title: localize('sparkJobSubmission_MainFilePath', 'JAR/py File'),
required: true
}, parentLayout);
this._mainClassInputBox = builder.inputBox().component();
formContainer.addFormItem({
component: this._mainClassInputBox,
title: localize('sparkJobSubmission_MainClass', 'Main Class'),
required: true
}, parentLayout);
this._argumentsInputBox = builder.inputBox().component();
formContainer.addFormItem({
component: this._argumentsInputBox,
title: localize('sparkJobSubmission_Arguments', 'Arguments')
},
Object.assign(
{ info: localize('sparkJobSubmission_ArgumentsTooltip', 'Command line arguments used in your main class, multiple arguments should be split by space.') },
parentLayout));
await modelView.initializeModel(formContainer.component());
});
}
public async validate(): Promise<boolean> {
if (!this._jobNameInputBox.value) {
this._dataModel.showDialogError(localize('sparkJobSubmission_NotSpecifyJobName', 'Property Job Name is not specified.'));
return false;
}
if (this._fileSourceDropDown.value === SparkFileSource.Local.toString()) {
if (this._sparkSourceFileInputBox.value) {
this._dataModel.isMainSourceFromLocal = true;
this._dataModel.updateModelByLocalPath(this._sparkSourceFileInputBox.value);
} else {
this._dataModel.showDialogError(localize('sparkJobSubmission_NotSpecifyJARPYPath', 'Property JAR/py File is not specified.'));
return false;
}
} else {
if (this._sparkSourceFileInputBox.value) {
this._dataModel.isMainSourceFromLocal = false;
this._dataModel.hdfsSubmitFilePath = this._sparkSourceFileInputBox.value;
} else {
this._dataModel.showDialogError(localize('sparkJobSubmission_NotSpecifyJARPYPath', 'Property JAR/py File is not specified.'));
return false;
}
}
if (this._dataModel.isJarFile() && !this._mainClassInputBox.value) {
this._dataModel.showDialogError(localize('sparkJobSubmission_NotSpecifyMainClass', 'Property Main Class is not specified.'));
return false;
}
// 1. For local file Source check whether they existed.
if (this._dataModel.isMainSourceFromLocal) {
if (!fs.existsSync(this._dataModel.localFileSourcePath)) {
this._dataModel.showDialogError(LocalizedConstants.sparkJobSubmissionLocalFileNotExisted(this._dataModel.localFileSourcePath));
return false;
}
} else {
// 2. Check HDFS file existed for HDFS source.
try {
let isFileExisted = await this._dataModel.isClusterFileExisted(this._dataModel.hdfsSubmitFilePath);
if (!isFileExisted) {
this._dataModel.showDialogError(localize('sparkJobSubmission_HDFSFileNotExistedWithPath', '{0} does not exist in Cluster or exception thrown. ', this._dataModel.hdfsSubmitFilePath));
return false;
}
} catch (error) {
this._dataModel.showDialogError(localize('sparkJobSubmission_HDFSFileNotExisted', 'The specified HDFS file does not exist. '));
return false;
}
}
return true;
}
private async onSelectFile(): Promise<void> {
let filePath = await this.pickFile();
if (filePath) {
this._sparkSourceFileInputBox.value = filePath;
}
}
public getInputValues(): string[] {
return [this._jobNameInputBox.value, this._mainClassInputBox.value, this._argumentsInputBox.value];
}
public async pickFile(): Promise<string> {
try {
let filter = { 'JAR/py files': ['jar', 'py'] };
let options: vscode.OpenDialogOptions = {
canSelectFiles: true,
canSelectFolders: false,
canSelectMany: false,
openLabel: localize('sparkSelectLocalFile', 'Select'),
filters: filter
};
let fileUris: vscode.Uri[] = await this.apiWrapper.showOpenDialog(options);
if (fileUris && fileUris[0]) {
return fileUris[0].fsPath;
}
return undefined;
} catch (err) {
this.apiWrapper.showErrorMessage(localize('sparkJobSubmission_SelectFileError', 'Error in locating the file due to Error: {0}', utils.getErrorMessage(err)));
return undefined;
}
}
}

View File

@@ -0,0 +1,168 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the Source EULA. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
import * as sqlops from 'sqlops';
import * as vscode from 'vscode';
import * as nls from 'vscode-nls';
import * as utils from '../../../utils';
import * as LocalizedConstants from '../../../localizedConstants';
import { AppContext } from '../../../appContext';
import { ApiWrapper } from '../../../apiWrapper';
import { SparkJobSubmissionModel } from './sparkJobSubmissionModel';
import { SparkConfigurationTab } from './sparkConfigurationTab';
import { SparkJobSubmissionInput } from './sparkJobSubmissionService';
import { SparkAdvancedTab } from './sparkAdvancedTab';
import { SqlClusterConnection } from '../../../objectExplorerNodeProvider/connection';
const localize = nls.loadMessageBundle();
export class SparkJobSubmissionDialog {
private _dialog: sqlops.window.modelviewdialog.Dialog;
private _dataModel: SparkJobSubmissionModel;
private _sparkConfigTab: SparkConfigurationTab;
private _sparkAdvancedTab: SparkAdvancedTab;
private get apiWrapper(): ApiWrapper {
return this.appContext.apiWrapper;
}
constructor(
private sqlClusterConnection: SqlClusterConnection,
private appContext: AppContext,
private outputChannel: vscode.OutputChannel) {
if (!this.sqlClusterConnection || !this.appContext || !this.outputChannel) {
throw new Error(localize('sparkJobSubmission_SparkJobSubmissionDialogInitializeError',
'Parameters for SparkJobSubmissionDialog is illegal'));
}
}
public async openDialog(path?: string): Promise<void> {
this._dialog = this.apiWrapper.createDialog(localize('sparkJobSubmission_DialogTitleNewJob', 'New Job'));
this._dataModel = new SparkJobSubmissionModel(this.sqlClusterConnection, this._dialog, this.appContext);
this._sparkConfigTab = new SparkConfigurationTab(this._dataModel, this.appContext, path);
this._sparkAdvancedTab = new SparkAdvancedTab(this.appContext);
this._dialog.content = [this._sparkConfigTab.tab, this._sparkAdvancedTab.tab];
this._dialog.cancelButton.label = localize('sparkJobSubmission_DialogCancelButton', 'Cancel');
this._dialog.okButton.label = localize('sparkJobSubmission_DialogSubmitButton', 'Submit');
this._dialog.okButton.onClick(() => this.onClickOk());
this._dialog.registerCloseValidator(() => this.handleValidate());
await this.apiWrapper.openDialog(this._dialog);
}
private onClickOk(): void {
let jobName = localize('sparkJobSubmission_SubmitSparkJob', '{0} Spark Job Submission:',
this._sparkConfigTab.getInputValues()[0]);
this.apiWrapper.startBackgroundOperation(
{
connection: this.sqlClusterConnection.connection,
displayName: jobName,
description: jobName,
isCancelable: false,
operation: op => {
this.onSubmit(op);
}
}
);
}
private async onSubmit(op: sqlops.BackgroundOperation): Promise<void> {
try {
this.outputChannel.show();
let msg = localize('sparkJobSubmission_SubmissionStartMessage',
'.......................... Submit Spark Job Start ..........................');
this.outputChannel.appendLine(msg);
// 1. Upload local file to HDFS for local source.
if (this._dataModel.isMainSourceFromLocal) {
try {
this.outputChannel.appendLine(this.addInfoTag(LocalizedConstants.sparkJobSubmissionPrepareUploadingFile(this._dataModel.localFileSourcePath, this._dataModel.hdfsFolderDestinationPath)));
op.updateStatus(sqlops.TaskStatus.InProgress, LocalizedConstants.sparkJobSubmissionPrepareUploadingFile(this._dataModel.localFileSourcePath, this._dataModel.hdfsFolderDestinationPath));
await this._dataModel.uploadFile(this._dataModel.localFileSourcePath, this._dataModel.hdfsFolderDestinationPath);
vscode.window.showInformationMessage(LocalizedConstants.sparkJobSubmissionUploadingFileSucceeded);
this.outputChannel.appendLine(this.addInfoTag(LocalizedConstants.sparkJobSubmissionUploadingFileSucceeded));
op.updateStatus(sqlops.TaskStatus.InProgress, LocalizedConstants.sparkJobSubmissionUploadingFileSucceeded);
} catch (error) {
vscode.window.showErrorMessage(LocalizedConstants.sparkJobSubmissionUploadingFileFailed(utils.getErrorMessage(error)));
this.outputChannel.appendLine(this.addErrorTag(LocalizedConstants.sparkJobSubmissionUploadingFileFailed(utils.getErrorMessage(error))));
op.updateStatus(sqlops.TaskStatus.Failed, LocalizedConstants.sparkJobSubmissionUploadingFileFailed(utils.getErrorMessage(error)));
this.outputChannel.appendLine(LocalizedConstants.sparkJobSubmissionEndMessage);
return;
}
}
// 2. Submit job to cluster.
let submissionSettings: SparkJobSubmissionInput = this.getSubmissionInput();
this.outputChannel.appendLine(this.addInfoTag(LocalizedConstants.sparkJobSubmissionPrepareSubmitJob(submissionSettings.jobName)));
op.updateStatus(sqlops.TaskStatus.InProgress, LocalizedConstants.sparkJobSubmissionPrepareSubmitJob(submissionSettings.jobName));
let livyBatchId = await this._dataModel.submitBatchJobByLivy(submissionSettings);
vscode.window.showInformationMessage(LocalizedConstants.sparkJobSubmissionSparkJobHasBeenSubmitted);
this.outputChannel.appendLine(this.addInfoTag(LocalizedConstants.sparkJobSubmissionSparkJobHasBeenSubmitted));
op.updateStatus(sqlops.TaskStatus.InProgress, LocalizedConstants.sparkJobSubmissionSparkJobHasBeenSubmitted);
// 3. Get SparkHistory/YarnUI Url.
try {
let appId = await this._dataModel.getApplicationID(submissionSettings, livyBatchId);
let sparkHistoryUrl = this._dataModel.generateSparkHistoryUIUrl(submissionSettings, appId);
vscode.window.showInformationMessage(LocalizedConstants.sparkJobSubmissionSparkHistoryLinkMessage(sparkHistoryUrl));
this.outputChannel.appendLine(this.addInfoTag(LocalizedConstants.sparkJobSubmissionSparkHistoryLinkMessage(sparkHistoryUrl)));
op.updateStatus(sqlops.TaskStatus.Succeeded, LocalizedConstants.sparkJobSubmissionSparkHistoryLinkMessage(sparkHistoryUrl));
/*
// Spark Tracking URl is not working now.
let sparkTrackingUrl = this._dataModel.generateSparkTrackingUIUrl(submissionSettings, appId);
vscode.window.showInformationMessage(LocalizedConstants.sparkJobSubmissionTrackingLinkMessage(sparkTrackingUrl));
this.outputChannel.appendLine(this.addInfoTag(LocalizedConstants.sparkJobSubmissionTrackingLinkMessage(sparkTrackingUrl)));
op.updateStatus(sqlops.TaskStatus.Succeeded, LocalizedConstants.sparkJobSubmissionTrackingLinkMessage(sparkTrackingUrl));
*/
let yarnUIUrl = this._dataModel.generateYarnUIUrl(submissionSettings, appId);
vscode.window.showInformationMessage(LocalizedConstants.sparkJobSubmissionYarnUIMessage(yarnUIUrl));
this.outputChannel.appendLine(this.addInfoTag(LocalizedConstants.sparkJobSubmissionYarnUIMessage(yarnUIUrl)));
op.updateStatus(sqlops.TaskStatus.Succeeded, LocalizedConstants.sparkJobSubmissionYarnUIMessage(yarnUIUrl));
} catch (error) {
vscode.window.showErrorMessage(LocalizedConstants.sparkJobSubmissionGetApplicationIdFailed(utils.getErrorMessage(error)));
this.outputChannel.appendLine(this.addErrorTag(LocalizedConstants.sparkJobSubmissionGetApplicationIdFailed(utils.getErrorMessage(error))));
op.updateStatus(sqlops.TaskStatus.Failed, LocalizedConstants.sparkJobSubmissionGetApplicationIdFailed(utils.getErrorMessage(error)));
this.outputChannel.appendLine(LocalizedConstants.sparkJobSubmissionEndMessage);
return;
}
this.outputChannel.appendLine(LocalizedConstants.sparkJobSubmissionEndMessage);
} catch (error) {
vscode.window.showErrorMessage(LocalizedConstants.sparkJobSubmissionSubmitJobFailed(utils.getErrorMessage(error)));
this.outputChannel.appendLine(this.addErrorTag(LocalizedConstants.sparkJobSubmissionSubmitJobFailed(utils.getErrorMessage(error))));
op.updateStatus(sqlops.TaskStatus.Failed, LocalizedConstants.sparkJobSubmissionSubmitJobFailed(utils.getErrorMessage(error)));
this.outputChannel.appendLine(LocalizedConstants.sparkJobSubmissionEndMessage);
}
}
private async handleValidate(): Promise<boolean> {
return this._sparkConfigTab.validate();
}
private getSubmissionInput(): SparkJobSubmissionInput {
let generalConfig = this._sparkConfigTab.getInputValues();
let advancedConfig = this._sparkAdvancedTab.getInputValues();
return new SparkJobSubmissionInput(generalConfig[0], this._dataModel.hdfsSubmitFilePath, generalConfig[1], generalConfig[2],
advancedConfig[0], advancedConfig[1], advancedConfig[2]);
}
private addInfoTag(info: string): string {
return `[Info] ${info}`;
}
private addErrorTag(error: string): string {
return `[Error] ${error}`;
}
}

View File

@@ -0,0 +1,206 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the Source EULA. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
import * as sqlops from 'sqlops';
import * as nls from 'vscode-nls';
const localize = nls.loadMessageBundle();
import * as fs from 'fs';
import * as fspath from 'path';
import * as os from 'os';
import * as constants from '../../../constants';
import { SqlClusterConnection } from '../../../objectExplorerNodeProvider/connection';
import * as LocalizedConstants from '../../../localizedConstants';
import * as utils from '../../../utils';
import { SparkJobSubmissionService, SparkJobSubmissionInput, LivyLogResponse } from './sparkJobSubmissionService';
import { AppContext } from '../../../appContext';
import { IFileSource, File, joinHdfsPath } from '../../../objectExplorerNodeProvider/fileSources';
// Stores important state and service methods used by the Spark Job Submission Dialog.
export class SparkJobSubmissionModel {
private _dialogService: SparkJobSubmissionService;
private _guidForClusterFolder: string;
public get guidForClusterFolder(): string { return this._guidForClusterFolder; }
// Whether the file is from local or HDFS
public isMainSourceFromLocal: boolean;
// indicate the final path to be submitted within HDFS
public hdfsSubmitFilePath: string;
// local file uploading related path: source; destinationFolder
public localFileSourcePath: string;
public hdfsFolderDestinationPath: string;
constructor(
private readonly _sqlClusterConnection: SqlClusterConnection,
private readonly _dialog: sqlops.window.modelviewdialog.Dialog,
private readonly _appContext: AppContext,
requestService?: (args: any) => any) {
if (!this._sqlClusterConnection || !this._dialog || !this._appContext) {
throw new Error(localize('sparkJobSubmission_SparkJobSubmissionModelInitializeError',
'Parameters for SparkJobSubmissionModel is illegal'));
}
this._dialogService = new SparkJobSubmissionService(requestService);
this._guidForClusterFolder = utils.generateGuid();
}
public get connection(): SqlClusterConnection { return this._sqlClusterConnection; }
public get dialogService(): SparkJobSubmissionService { return this._dialogService; }
public get dialog(): sqlops.window.modelviewdialog.Dialog { return this._dialog; }
public isJarFile(): boolean {
if (this.hdfsSubmitFilePath) {
return this.hdfsSubmitFilePath.toLowerCase().endsWith('jar');
}
return false;
}
public showDialogError(message: string): void {
let errorLevel = sqlops.window.modelviewdialog.MessageLevel ? sqlops.window.modelviewdialog.MessageLevel : 0;
this._dialog.message = {
text: message,
level: <sqlops.window.modelviewdialog.MessageLevel>errorLevel
};
}
public showDialogInfo(message: string): void {
let infoLevel = sqlops.window.modelviewdialog.MessageLevel ? sqlops.window.modelviewdialog.MessageLevel.Information : 2;
this._dialog.message = {
text: message,
level: infoLevel
};
}
public getSparkClusterUrl(): string {
if (this._sqlClusterConnection && this._sqlClusterConnection.host && this._sqlClusterConnection.port) {
return `https://${this._sqlClusterConnection.host}:${this._sqlClusterConnection.port}`;
}
// Only for safety check, Won't happen with correct Model initialize.
return '';
}
public async submitBatchJobByLivy(submissionArgs: SparkJobSubmissionInput): Promise<string> {
try {
if (!submissionArgs) {
return Promise.reject(localize('sparkJobSubmission_submissionArgsIsInvalid', 'submissionArgs is invalid. '));
}
submissionArgs.setSparkClusterInfo(this._sqlClusterConnection);
let livyBatchId = await this._dialogService.submitBatchJob(submissionArgs);
return livyBatchId;
} catch (error) {
return Promise.reject(error);
}
}
public async getApplicationID(submissionArgs: SparkJobSubmissionInput, livyBatchId: string, retryTime?: number): Promise<string> {
// TODO: whether set timeout as 15000ms
try {
if (!submissionArgs) {
return Promise.reject(localize('sparkJobSubmission_submissionArgsIsInvalid', 'submissionArgs is invalid. '));
}
if (!utils.isValidNumber(livyBatchId)) {
return Promise.reject(new Error(localize('sparkJobSubmission_LivyBatchIdIsInvalid', 'livyBatchId is invalid. ')));
}
if (!retryTime) {
retryTime = constants.mssqlClusterLivyRetryTimesForCheckYarnApp;
}
submissionArgs.setSparkClusterInfo(this._sqlClusterConnection);
let response: LivyLogResponse = undefined;
let timeOutCount: number = 0;
do {
timeOutCount++;
await this.sleep(constants.mssqlClusterLivyTimeInMSForCheckYarnApp);
response = await this._dialogService.getYarnAppId(submissionArgs, livyBatchId);
} while (response.appId === '' && timeOutCount < retryTime);
if (response.appId === '') {
return Promise.reject(localize('sparkJobSubmission_GetApplicationIdTimeOut', 'Get Application Id time out. {0}[Log] {1}', os.EOL, response.log));
} else {
return response.appId;
}
} catch (error) {
return Promise.reject(error);
}
}
public async uploadFile(localFilePath: string, hdfsFolderPath: string): Promise<void> {
try {
if (!localFilePath || !hdfsFolderPath) {
return Promise.reject(localize('sparkJobSubmission_localFileOrFolderNotSpecified.', 'Property localFilePath or hdfsFolderPath is not specified. '));
}
if (!fs.existsSync(localFilePath)) {
return Promise.reject(LocalizedConstants.sparkJobSubmissionLocalFileNotExisted(localFilePath));
}
let fileSource: IFileSource = this._sqlClusterConnection.createHdfsFileSource();
await fileSource.writeFile(new File(localFilePath, false), hdfsFolderPath);
} catch (error) {
return Promise.reject(error);
}
}
public async isClusterFileExisted(path: string): Promise<boolean> {
try {
if (!path) {
return Promise.reject(localize('sparkJobSubmission_PathNotSpecified.', 'Property Path is not specified. '));
}
let fileSource: IFileSource = this._sqlClusterConnection.createHdfsFileSource();
return await fileSource.exists(path);
} catch (error) {
return Promise.reject(error);
}
}
public updateModelByLocalPath(localPath: string): void {
if (localPath) {
this.localFileSourcePath = localPath;
this.hdfsFolderDestinationPath = this.generateDestinationFolder();
let fileName = fspath.basename(localPath);
this.hdfsSubmitFilePath = joinHdfsPath(this.hdfsFolderDestinationPath, fileName);
} else {
this.hdfsSubmitFilePath = '';
}
}
// Example path: /SparkSubmission/2018/08/21/b682a6c4-1954-401e-8542-9c573d69d9c0/default_artifact.jar
private generateDestinationFolder(): string {
let day = new Date();
return `/SparkSubmission/${day.getUTCFullYear()}/${day.getUTCMonth() + 1}/${day.getUTCDate()}/${this._guidForClusterFolder}`;
}
// Example: https://host:30443/gateway/default/yarn/cluster/app/application_1532646201938_0057
public generateYarnUIUrl(submissionArgs: SparkJobSubmissionInput, appId: string): string {
return `https://${submissionArgs.host}:${submissionArgs.port}/gateway/default/yarn/cluster/app/${appId}`;
}
// Example: https://host:30443/gateway/default/yarn/proxy/application_1532646201938_0411
public generateSparkTrackingUIUrl(submissionArgs: SparkJobSubmissionInput, appId: string): string {
return `https://${submissionArgs.host}:${submissionArgs.port}/gateway/default/yarn/proxy/${appId}`;
}
// Example: https://host:30443/gateway/default/sparkhistory/history/application_1532646201938_0057/1
public generateSparkHistoryUIUrl(submissionArgs: SparkJobSubmissionInput, appId: string): string {
return `https://${submissionArgs.host}:${submissionArgs.port}/gateway/default/sparkhistory/history/${appId}/1`;
}
private async sleep(ms: number): Promise<{}> {
// tslint:disable-next-line no-string-based-set-timeout
return new Promise(resolve => setTimeout(resolve, ms));
}
}

View File

@@ -0,0 +1,187 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the Source EULA. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
import * as os from 'os';
import * as nls from 'vscode-nls';
const localize = nls.loadMessageBundle();
import * as constants from '../../../constants';
import { SqlClusterConnection } from '../../../objectExplorerNodeProvider/connection';
import * as utils from '../../../utils';
export class SparkJobSubmissionService {
private _requestPromise: (args: any) => any;
constructor(
requestService?: (args: any) => any) {
if (requestService) {
// this is to fake the request service for test.
this._requestPromise = requestService;
} else {
this._requestPromise = require('request-promise');
}
}
public async submitBatchJob(submissionArgs: SparkJobSubmissionInput): Promise<string> {
try {
let livyUrl: string = `https://${submissionArgs.host}:${submissionArgs.port}${submissionArgs.livyPath}/`;
let options = {
uri: livyUrl,
method: 'POST',
json: true,
// TODO, change it back after service's authentication changed.
rejectUnauthorized: false,
body: {
file: submissionArgs.sparkFile,
proxyUser: submissionArgs.user,
className: submissionArgs.mainClass,
name: submissionArgs.jobName
},
// authentication headers
headers: {
'Authorization': 'Basic ' + new Buffer(submissionArgs.user + ':' + submissionArgs.password).toString('base64')
}
};
// Set arguments
if (submissionArgs.jobArguments && submissionArgs.jobArguments.trim()) {
let argsList = submissionArgs.jobArguments.split(' ');
if (argsList.length > 0) {
options.body['args'] = argsList;
}
}
// Set jars files
if (submissionArgs.jarFileList && submissionArgs.jarFileList.trim()) {
let jarList = submissionArgs.jarFileList.split(';');
if (jarList.length > 0) {
options.body['jars'] = jarList;
}
}
// Set py files
if (submissionArgs.pyFileList && submissionArgs.pyFileList.trim()) {
let pyList = submissionArgs.pyFileList.split(';');
if (pyList.length > 0) {
options.body['pyFiles'] = pyList;
}
}
// Set other files
if (submissionArgs.otherFileList && submissionArgs.otherFileList.trim()) {
let otherList = submissionArgs.otherFileList.split(';');
if (otherList.length > 0) {
options.body['files'] = otherList;
}
}
const response = await this._requestPromise(options);
if (response && utils.isValidNumber(response.id)) {
return response.id;
}
return Promise.reject(new Error(localize('sparkJobSubmission_LivyNoBatchIdReturned',
'No Spark job batch id is returned from response.{0}[Error] {1}', os.EOL, JSON.stringify(response))));
} catch (error) {
return Promise.reject(error);
}
}
public async getYarnAppId(submissionArgs: SparkJobSubmissionInput, livyBatchId: string): Promise<LivyLogResponse> {
try {
let livyUrl = `https://${submissionArgs.host}:${submissionArgs.port}${submissionArgs.livyPath}/${livyBatchId}/log`;
let options = {
uri: livyUrl,
method: 'GET',
json: true,
rejectUnauthorized: false,
// authentication headers
headers: {
'Authorization': 'Basic ' + new Buffer(submissionArgs.user + ':' + submissionArgs.password).toString('base64')
}
};
const response = await this._requestPromise(options);
if (response && response.log) {
return this.extractYarnAppIdFromLog(response.log);
}
return Promise.reject(localize('sparkJobSubmission_LivyNoLogReturned',
'No log is returned within response.{0}[Error] {1}', os.EOL, JSON.stringify(response)));
} catch (error) {
return Promise.reject(error);
}
}
private extractYarnAppIdFromLog(log: any): LivyLogResponse {
let logForPrint = log;
if (Array.isArray(log)) {
logForPrint = log.join(os.EOL);
}
// eg: '18/08/23 11:02:50 INFO yarn.Client: Application report for application_1532646201938_0182 (state: ACCEPTED)'
for (let entry of log) {
if (entry.indexOf('Application report for') >= 0 && entry.indexOf('(state: ACCEPTED)') >= 0) {
let tokens = entry.split(' ');
for (let token of tokens) {
if (token.startsWith('application_')) {
return new LivyLogResponse(logForPrint, token);
}
}
}
}
return new LivyLogResponse(logForPrint, '');
}
}
export class SparkJobSubmissionInput {
public setSparkClusterInfo(sqlClusterConnection: SqlClusterConnection): void {
this._host = sqlClusterConnection.host;
this._port = sqlClusterConnection.port;
this._livyPath = constants.mssqlClusterLivySubmitPath;
this._user = sqlClusterConnection.user;
this._passWord = sqlClusterConnection.password;
}
constructor(
private readonly _jobName: string,
private readonly _sparkFile: string,
private readonly _mainClass: string,
private readonly _arguments: string,
private readonly _jarFileList: string,
private readonly _pyFileList: string,
private readonly _otherFileList: string,
private _host?: string,
private _port?: string,
private _livyPath?: string,
private _user?: string,
private _passWord?: string) {
}
public get jobName(): string { return this._jobName; }
public get sparkFile(): string { return this._sparkFile; }
public get mainClass(): string { return this._mainClass; }
public get jobArguments(): string { return this._arguments; }
public get jarFileList(): string { return this._jarFileList; }
public get otherFileList(): string { return this._otherFileList; }
public get pyFileList(): string { return this._pyFileList; }
public get host(): string { return this._host; }
public get port(): string { return this._port; }
public get livyPath(): string { return this._livyPath; }
public get user(): string { return this._user; }
public get password(): string { return this._passWord; }
}
export enum SparkFileSource {
HDFS = <any>'HDFS',
Local = <any>'Local'
}
export class LivyLogResponse {
constructor(public log: string, public appId: string) { }
}