Added WebHDFS rewritten to provide correct Error object and localized error messages (#4223)

This commit is contained in:
Gene Lee
2019-03-04 15:23:50 -08:00
committed by GitHub
parent 2309b16bd4
commit f4365dbd3a
6 changed files with 840 additions and 138 deletions

View File

@@ -5,16 +5,18 @@
'use strict';
import * as fspath from 'path';
import * as webhdfs from 'webhdfs';
import * as fs from 'fs';
import * as meter from 'stream-meter';
import * as bytes from 'bytes';
import * as https from 'https';
import * as readline from 'readline';
import * as os from 'os';
import * as nls from 'vscode-nls';
import * as constants from '../constants';
import * as utils from '../utils';
import { WebHDFS, HdfsError } from './webhdfs';
const localize = nls.loadMessageBundle();
export function joinHdfsPath(parent: string, child: string): string {
if (parent === constants.hdfsRootPath) {
@@ -93,97 +95,6 @@ export interface IHdfsFileStatus {
pathSuffix: string;
}
export interface IHdfsClient {
readdir(path: string, callback: (err: Error, files: any[]) => void): void;
/**
* Create readable stream for given path
*
* @method createReadStream
* @fires Request#data
* @fires WebHDFS#finish
*
* @param {String} path
* @param {Object} [opts]
*
* @returns {Object}
*/
createReadStream (path: string, opts?: object): fs.ReadStream;
/**
* Create writable stream for given path
*
* @example
*
* var WebHDFS = require('webhdfs');
* var hdfs = WebHDFS.createClient();
*
* var localFileStream = fs.createReadStream('/path/to/local/file');
* var remoteFileStream = hdfs.createWriteStream('/path/to/remote/file');
*
* localFileStream.pipe(remoteFileStream);
*
* remoteFileStream.on('error', function onError (err) {
* // Do something with the error
* });
*
* remoteFileStream.on('finish', function onFinish () {
* // Upload is done
* });
*
* @method createWriteStream
* @fires WebHDFS#finish
*
* @param {String} path
* @param {Boolean} [append] If set to true then append data to the file
* @param {Object} [opts]
*
* @returns {Object}
*/
createWriteStream(path: string, append?: boolean, opts?: object): fs.WriteStream;
/**
* Make new directory
*
* @method mkdir
*
* @param {String} path
* @param {String} [mode=0777]
* @param {Function} callback
*
* @returns {Object}
*/
mkdir (path: string, callback: Function): void;
mkdir (path: string, mode: string, callback: Function): void;
/**
* Delete directory or file path
*
* @method unlink
*
* @param {String} path
* @param {Boolean} [recursive=false]
* @param {Function} callback
*
* @returns {Object}
*/
rmdir (path: string, recursive: boolean, callback: Function): void;
/**
* Check file existence
* Wraps stat method
*
* @method stat
* @see WebHDFS.stat
*
* @param {String} path
* @param {Function} callback
*
* @returns {Object}
*/
exists (path: string, callback: Function): boolean;
}
export class FileSourceFactory {
private static _instance: FileSourceFactory;
@@ -204,11 +115,11 @@ export class FileSourceFactory {
port: options.port,
path: constants.hdfsRootPath,
rejectUnauthorized: false
};
};
let agent = new https.Agent(agentOptions);
requestParams['agent'] = agent;
}
return new HdfsFileSource(webhdfs.createClient(options, requestParams));
return new HdfsFileSource(WebHDFS.createClient(options, requestParams));
}
// remove port from host when port is specified after a comma or colon
@@ -231,7 +142,7 @@ export class FileSourceFactory {
}
export class HdfsFileSource implements IFileSource {
constructor(private client: IHdfsClient) {
constructor(private client: WebHDFS) {
}
public enumerateFiles(path: string): Promise<IFile[]> {
@@ -253,7 +164,7 @@ export class HdfsFileSource implements IFileSource {
public mkdir(dirName: string, remoteBasePath: string): Promise<void> {
return new Promise((resolve, reject) => {
let remotePath = joinHdfsPath(remoteBasePath, dirName);
this.client.mkdir(remotePath, (err) => {
this.client.mkdir(remotePath, undefined, (err) => {
if (err) {
reject(err);
} else {
@@ -269,20 +180,25 @@ export class HdfsFileSource implements IFileSource {
public readFile(path: string, maxBytes?: number): Promise<Buffer> {
return new Promise((resolve, reject) => {
let error: HdfsError = undefined;
let remoteFileStream = this.client.createReadStream(path);
if (maxBytes) {
remoteFileStream = remoteFileStream.pipe(meter(maxBytes));
}
let data = [];
let error = undefined;
remoteFileStream.on('error', (err) => {
error = err.toString();
if (error.includes('Stream exceeded specified max')) {
error = `File exceeds max size of ${bytes(maxBytes)}`;
}
error = <HdfsError>err;
reject(error);
});
if (maxBytes) {
remoteFileStream = remoteFileStream.pipe(meter(maxBytes));
remoteFileStream.on('error', (err) => {
error = <HdfsError>err;
if (error.message.includes('Stream exceeded specified max')) {
error.message = localize('maxSizeReached', 'File exceeds max size of ${0}', bytes(maxBytes));
}
reject(error);
});
}
let data: any[] = [];
remoteFileStream.on('data', (chunk) => {
data.push(chunk);
});
@@ -303,7 +219,7 @@ export class HdfsFileSource implements IFileSource {
let lineCount = 0;
let lineData: string[] = [];
let errorMsg = undefined;
let error: HdfsError = undefined;
lineReader.on('line', (line: string) => {
lineCount++;
lineData.push(line);
@@ -313,11 +229,11 @@ export class HdfsFileSource implements IFileSource {
}
})
.on('error', (err) => {
errorMsg = utils.getErrorMessage(err);
reject(errorMsg);
error = <HdfsError>err;
reject(error);
})
.on('close', () => {
if (!errorMsg) {
if (!error) {
resolve(Buffer.from(lineData.join(os.EOL)));
}
});
@@ -329,23 +245,26 @@ export class HdfsFileSource implements IFileSource {
let fileName = fspath.basename(localFile.path);
let remotePath = joinHdfsPath(remoteDirPath, fileName);
let error: HdfsError = undefined;
let writeStream = this.client.createWriteStream(remotePath);
let readStream = fs.createReadStream(localFile.path);
readStream.pipe(writeStream);
let error: string | Error = undefined;
// API always calls finish, so catch error then handle exit in the finish event
writeStream.on('error', (err => {
error = err;
writeStream.on('error', (err) => {
error = <HdfsError>err;
reject(error);
}));
});
writeStream.on('finish', (location) => {
if (!error) {
resolve(location);
}
});
let readStream = fs.createReadStream(localFile.path);
readStream.on('error', (err) => {
error = err;
reject(error);
});
readStream.pipe(writeStream);
});
}
@@ -363,8 +282,12 @@ export class HdfsFileSource implements IFileSource {
public exists(path: string): Promise<boolean> {
return new Promise((resolve, reject) => {
this.client.exists(path, (result) => {
resolve(result);
this.client.exists(path, (error, exists) => {
if (error) {
reject(error);
} else {
resolve(exists);
}
});
});
}