diff --git a/extensions/mssql/src/objectExplorerNodeProvider/hdfsCommands.ts b/extensions/mssql/src/objectExplorerNodeProvider/hdfsCommands.ts index c484ce4ce6..08b3d82f49 100644 --- a/extensions/mssql/src/objectExplorerNodeProvider/hdfsCommands.ts +++ b/extensions/mssql/src/objectExplorerNodeProvider/hdfsCommands.ts @@ -84,7 +84,7 @@ export class UploadFilesCommand extends ProgressCommand { if (fileUris) { let files: IFile[] = fileUris.map(uri => uri.fsPath).map(this.mapPathsToFiles()); await this.executeWithProgress( - async (cancelToken: vscode.CancellationTokenSource) => this.writeFiles(files, folderNode, cancelToken), + (cancelToken: vscode.CancellationTokenSource) => this.writeFiles(files, folderNode, cancelToken), localize('uploading', 'Uploading files to HDFS'), true, () => this.apiWrapper.showInformationMessage(localize('uploadCanceled', 'Upload operation was canceled'))); if (context.type === constants.ObjectExplorerService) { @@ -264,7 +264,7 @@ export class SaveFileCommand extends ProgressCommand { }); if (fileUri) { await this.executeWithProgress( - async (cancelToken: vscode.CancellationTokenSource) => this.doSaveAndOpen(fileUri, fileNode, cancelToken), + (cancelToken: vscode.CancellationTokenSource) => this.doSaveAndOpen(fileUri, fileNode, cancelToken), localize('saving', 'Saving HDFS Files'), true, () => this.apiWrapper.showInformationMessage(localize('saveCanceled', 'Save operation was canceled'))); } diff --git a/extensions/mssql/src/objectExplorerNodeProvider/webhdfs.ts b/extensions/mssql/src/objectExplorerNodeProvider/webhdfs.ts index 0fb64b6d78..b355b7e112 100644 --- a/extensions/mssql/src/objectExplorerNodeProvider/webhdfs.ts +++ b/extensions/mssql/src/objectExplorerNodeProvider/webhdfs.ts @@ -15,6 +15,16 @@ import { IHdfsOptions, IRequestParams } from './fileSources'; const localize = nls.loadMessageBundle(); const ErrorMessageInvalidDataStructure = localize('webhdfs.invalidDataStructure', "Invalid Data Structure"); +const emitError = (instance, err) => { + const isErrorEmitted = instance.errorEmitted; + + if (!isErrorEmitted) { + instance.emit('error', err); + instance.emit('finish'); + } + + instance.errorEmitted = true; +}; export class WebHDFS { private _requestParams: IRequestParams; private _opts: IHdfsOptions; @@ -544,31 +554,55 @@ export class WebHDFS { return this.doCreateWriteStream(params); } // Else, must add kerberos token and handle redirects + return this.createKerberosWriteStream(params); + } + + private createKerberosWriteStream(params: any): fs.WriteStream { params.followRedirect = false; - let replyStream = through(); + // Create an intermediate stream that pauses until we get a positive + // response from the server + let isWaiting = true; + let firstCb: Function = undefined; + let replyStream = through(function (chunk, enc, cb) { + this.push(chunk, enc); + if (isWaiting) { + firstCb = cb; + } else { + cb(); + } + }); let handleErr = (err) => { replyStream.emit('error', err); replyStream.end(); }; let initRedirectedStream = () => { - let redirectedStream = this.doCreateWriteStream(params); - replyStream.pipe(redirectedStream); + // After redirect, create valid stream to correct location + // and pipe the intermediate stream to it, unblocking the data flow + params.headers['content-type'] = 'application/octet-stream'; + let upload = request(params, (err, res, bo) => { + if (err || this.isError(res)) { + emitError(replyStream, this.parseError(res, bo, err)); + replyStream.end(); + } + else if (res.headers.hasOwnProperty('location')) { + replyStream.emit('finish', res.headers.location); + } + else { + replyStream.emit('finish'); + } + }); + isWaiting = false; + replyStream.pipe(upload); + if (firstCb) { + firstCb(); + } }; this.requestWithRedirectAndAuth(params, initRedirectedStream, handleErr); return replyStream; } private doCreateWriteStream(params: any): fs.WriteStream { - let emitError = (instance, err) => { - const isErrorEmitted = instance.errorEmitted; - if (!isErrorEmitted) { - instance.emit('error', err); - instance.emit('finish'); - } - - instance.errorEmitted = true; - }; let canResume: boolean = true; let stream = undefined; let req = request(params, (error, response, body) => { @@ -680,6 +714,8 @@ export class WebHDFS { this.setKerberosAuthOnParams(params) .then(onRedirected) .catch(handleErr); + } else { + handleErr(err); } }); }