Fix HDFS AD write support + improve error logging (#6855)

* Fix HDFS AD write support + improve error logging
- Implemented write stream piping correctly so data is paused until we're ready to push to server
- During testing, noted that things weren't awaited correctly. Tweaked some functions to improve this

* Use correct content type
This commit is contained in:
Kevin Cunnane
2019-08-21 15:23:17 -07:00
committed by GitHub
parent 8e668b657c
commit 017ed93061
2 changed files with 50 additions and 14 deletions

View File

@@ -84,7 +84,7 @@ export class UploadFilesCommand extends ProgressCommand {
if (fileUris) {
let files: IFile[] = fileUris.map(uri => uri.fsPath).map(this.mapPathsToFiles());
await this.executeWithProgress(
async (cancelToken: vscode.CancellationTokenSource) => this.writeFiles(files, folderNode, cancelToken),
(cancelToken: vscode.CancellationTokenSource) => this.writeFiles(files, folderNode, cancelToken),
localize('uploading', 'Uploading files to HDFS'), true,
() => this.apiWrapper.showInformationMessage(localize('uploadCanceled', 'Upload operation was canceled')));
if (context.type === constants.ObjectExplorerService) {
@@ -264,7 +264,7 @@ export class SaveFileCommand extends ProgressCommand {
});
if (fileUri) {
await this.executeWithProgress(
async (cancelToken: vscode.CancellationTokenSource) => this.doSaveAndOpen(fileUri, fileNode, cancelToken),
(cancelToken: vscode.CancellationTokenSource) => this.doSaveAndOpen(fileUri, fileNode, cancelToken),
localize('saving', 'Saving HDFS Files'), true,
() => this.apiWrapper.showInformationMessage(localize('saveCanceled', 'Save operation was canceled')));
}

View File

@@ -15,6 +15,16 @@ import { IHdfsOptions, IRequestParams } from './fileSources';
const localize = nls.loadMessageBundle();
const ErrorMessageInvalidDataStructure = localize('webhdfs.invalidDataStructure', "Invalid Data Structure");
const emitError = (instance, err) => {
const isErrorEmitted = instance.errorEmitted;
if (!isErrorEmitted) {
instance.emit('error', err);
instance.emit('finish');
}
instance.errorEmitted = true;
};
export class WebHDFS {
private _requestParams: IRequestParams;
private _opts: IHdfsOptions;
@@ -544,31 +554,55 @@ export class WebHDFS {
return this.doCreateWriteStream(params);
}
// Else, must add kerberos token and handle redirects
return this.createKerberosWriteStream(params);
}
private createKerberosWriteStream(params: any): fs.WriteStream {
params.followRedirect = false;
let replyStream = through();
// Create an intermediate stream that pauses until we get a positive
// response from the server
let isWaiting = true;
let firstCb: Function = undefined;
let replyStream = through(function (chunk, enc, cb) {
this.push(chunk, enc);
if (isWaiting) {
firstCb = cb;
} else {
cb();
}
});
let handleErr = (err) => {
replyStream.emit('error', err);
replyStream.end();
};
let initRedirectedStream = () => {
let redirectedStream = this.doCreateWriteStream(params);
replyStream.pipe(redirectedStream);
// After redirect, create valid stream to correct location
// and pipe the intermediate stream to it, unblocking the data flow
params.headers['content-type'] = 'application/octet-stream';
let upload = request(params, (err, res, bo) => {
if (err || this.isError(res)) {
emitError(replyStream, this.parseError(res, bo, err));
replyStream.end();
}
else if (res.headers.hasOwnProperty('location')) {
replyStream.emit('finish', res.headers.location);
}
else {
replyStream.emit('finish');
}
});
isWaiting = false;
replyStream.pipe(upload);
if (firstCb) {
firstCb();
}
};
this.requestWithRedirectAndAuth(params, initRedirectedStream, handleErr);
return <fs.WriteStream><any>replyStream;
}
private doCreateWriteStream(params: any): fs.WriteStream {
let emitError = (instance, err) => {
const isErrorEmitted = instance.errorEmitted;
if (!isErrorEmitted) {
instance.emit('error', err);
instance.emit('finish');
}
instance.errorEmitted = true;
};
let canResume: boolean = true;
let stream = undefined;
let req = request(params, (error, response, body) => {
@@ -680,6 +714,8 @@ export class WebHDFS {
this.setKerberosAuthOnParams(params)
.then(onRedirected)
.catch(handleErr);
} else {
handleErr(err);
}
});
}