diff --git a/extensions/mssql/package.json b/extensions/mssql/package.json index 6a5a041e6c..9acfd52e01 100644 --- a/extensions/mssql/package.json +++ b/extensions/mssql/package.json @@ -30,7 +30,7 @@ "uri-js": "^4.2.2", "vscode-extension-telemetry": "^0.0.15", "vscode-nls": "^4.0.0", - "webhdfs": "^1.1.1" + "buffer-stream-reader": "^0.1.1" }, "devDependencies": {}, "contributes": { diff --git a/extensions/mssql/src/objectExplorerNodeProvider/fileSources.ts b/extensions/mssql/src/objectExplorerNodeProvider/fileSources.ts index 5fa581c7a4..5f95b3652f 100644 --- a/extensions/mssql/src/objectExplorerNodeProvider/fileSources.ts +++ b/extensions/mssql/src/objectExplorerNodeProvider/fileSources.ts @@ -5,16 +5,18 @@ 'use strict'; import * as fspath from 'path'; -import * as webhdfs from 'webhdfs'; import * as fs from 'fs'; import * as meter from 'stream-meter'; import * as bytes from 'bytes'; import * as https from 'https'; import * as readline from 'readline'; import * as os from 'os'; +import * as nls from 'vscode-nls'; import * as constants from '../constants'; -import * as utils from '../utils'; +import { WebHDFS, HdfsError } from './webhdfs'; + +const localize = nls.loadMessageBundle(); export function joinHdfsPath(parent: string, child: string): string { if (parent === constants.hdfsRootPath) { @@ -93,97 +95,6 @@ export interface IHdfsFileStatus { pathSuffix: string; } -export interface IHdfsClient { - readdir(path: string, callback: (err: Error, files: any[]) => void): void; - - /** - * Create readable stream for given path - * - * @method createReadStream - * @fires Request#data - * @fires WebHDFS#finish - * - * @param {String} path - * @param {Object} [opts] - * - * @returns {Object} - */ - createReadStream (path: string, opts?: object): fs.ReadStream; - - /** - * Create writable stream for given path - * - * @example - * - * var WebHDFS = require('webhdfs'); - * var hdfs = WebHDFS.createClient(); - * - * var localFileStream = fs.createReadStream('/path/to/local/file'); - * var remoteFileStream = hdfs.createWriteStream('/path/to/remote/file'); - * - * localFileStream.pipe(remoteFileStream); - * - * remoteFileStream.on('error', function onError (err) { - * // Do something with the error - * }); - * - * remoteFileStream.on('finish', function onFinish () { - * // Upload is done - * }); - * - * @method createWriteStream - * @fires WebHDFS#finish - * - * @param {String} path - * @param {Boolean} [append] If set to true then append data to the file - * @param {Object} [opts] - * - * @returns {Object} - */ - createWriteStream(path: string, append?: boolean, opts?: object): fs.WriteStream; - - /** - * Make new directory - * - * @method mkdir - * - * @param {String} path - * @param {String} [mode=0777] - * @param {Function} callback - * - * @returns {Object} - */ - mkdir (path: string, callback: Function): void; - mkdir (path: string, mode: string, callback: Function): void; - - /** - * Delete directory or file path - * - * @method unlink - * - * @param {String} path - * @param {Boolean} [recursive=false] - * @param {Function} callback - * - * @returns {Object} - */ - rmdir (path: string, recursive: boolean, callback: Function): void; - - /** - * Check file existence - * Wraps stat method - * - * @method stat - * @see WebHDFS.stat - * - * @param {String} path - * @param {Function} callback - * - * @returns {Object} - */ - exists (path: string, callback: Function): boolean; -} - export class FileSourceFactory { private static _instance: FileSourceFactory; @@ -204,11 +115,11 @@ export class FileSourceFactory { port: options.port, path: constants.hdfsRootPath, rejectUnauthorized: false - }; + }; let agent = new https.Agent(agentOptions); requestParams['agent'] = agent; } - return new HdfsFileSource(webhdfs.createClient(options, requestParams)); + return new HdfsFileSource(WebHDFS.createClient(options, requestParams)); } // remove port from host when port is specified after a comma or colon @@ -231,7 +142,7 @@ export class FileSourceFactory { } export class HdfsFileSource implements IFileSource { - constructor(private client: IHdfsClient) { + constructor(private client: WebHDFS) { } public enumerateFiles(path: string): Promise { @@ -253,7 +164,7 @@ export class HdfsFileSource implements IFileSource { public mkdir(dirName: string, remoteBasePath: string): Promise { return new Promise((resolve, reject) => { let remotePath = joinHdfsPath(remoteBasePath, dirName); - this.client.mkdir(remotePath, (err) => { + this.client.mkdir(remotePath, undefined, (err) => { if (err) { reject(err); } else { @@ -269,20 +180,25 @@ export class HdfsFileSource implements IFileSource { public readFile(path: string, maxBytes?: number): Promise { return new Promise((resolve, reject) => { + let error: HdfsError = undefined; let remoteFileStream = this.client.createReadStream(path); - if (maxBytes) { - remoteFileStream = remoteFileStream.pipe(meter(maxBytes)); - } - let data = []; - let error = undefined; remoteFileStream.on('error', (err) => { - error = err.toString(); - if (error.includes('Stream exceeded specified max')) { - error = `File exceeds max size of ${bytes(maxBytes)}`; - } + error = err; reject(error); }); + if (maxBytes) { + remoteFileStream = remoteFileStream.pipe(meter(maxBytes)); + remoteFileStream.on('error', (err) => { + error = err; + if (error.message.includes('Stream exceeded specified max')) { + error.message = localize('maxSizeReached', 'File exceeds max size of ${0}', bytes(maxBytes)); + } + reject(error); + }); + } + + let data: any[] = []; remoteFileStream.on('data', (chunk) => { data.push(chunk); }); @@ -303,7 +219,7 @@ export class HdfsFileSource implements IFileSource { let lineCount = 0; let lineData: string[] = []; - let errorMsg = undefined; + let error: HdfsError = undefined; lineReader.on('line', (line: string) => { lineCount++; lineData.push(line); @@ -313,11 +229,11 @@ export class HdfsFileSource implements IFileSource { } }) .on('error', (err) => { - errorMsg = utils.getErrorMessage(err); - reject(errorMsg); + error = err; + reject(error); }) .on('close', () => { - if (!errorMsg) { + if (!error) { resolve(Buffer.from(lineData.join(os.EOL))); } }); @@ -329,23 +245,26 @@ export class HdfsFileSource implements IFileSource { let fileName = fspath.basename(localFile.path); let remotePath = joinHdfsPath(remoteDirPath, fileName); + let error: HdfsError = undefined; let writeStream = this.client.createWriteStream(remotePath); - - let readStream = fs.createReadStream(localFile.path); - readStream.pipe(writeStream); - - let error: string | Error = undefined; - // API always calls finish, so catch error then handle exit in the finish event - writeStream.on('error', (err => { - error = err; + writeStream.on('error', (err) => { + error = err; reject(error); - })); + }); writeStream.on('finish', (location) => { if (!error) { resolve(location); } }); + + let readStream = fs.createReadStream(localFile.path); + readStream.on('error', (err) => { + error = err; + reject(error); + }); + + readStream.pipe(writeStream); }); } @@ -363,8 +282,12 @@ export class HdfsFileSource implements IFileSource { public exists(path: string): Promise { return new Promise((resolve, reject) => { - this.client.exists(path, (result) => { - resolve(result); + this.client.exists(path, (error, exists) => { + if (error) { + reject(error); + } else { + resolve(exists); + } }); }); } diff --git a/extensions/mssql/src/objectExplorerNodeProvider/hdfsCommands.ts b/extensions/mssql/src/objectExplorerNodeProvider/hdfsCommands.ts index cba85cd86f..e1db838b1a 100644 --- a/extensions/mssql/src/objectExplorerNodeProvider/hdfsCommands.ts +++ b/extensions/mssql/src/objectExplorerNodeProvider/hdfsCommands.ts @@ -98,7 +98,8 @@ export class UploadFilesCommand extends ProgressCommand { } } } catch (err) { - this.apiWrapper.showErrorMessage(localize('uploadError', 'Error uploading files: {0}', utils.getErrorMessage(err))); + this.apiWrapper.showErrorMessage( + localize('uploadError', 'Error uploading files: {0}', utils.getErrorMessage(err, true))); } } @@ -156,7 +157,8 @@ export class MkDirCommand extends ProgressCommand { } } } catch (err) { - this.apiWrapper.showErrorMessage(localize('uploadError', 'Error uploading files: {0}', utils.getErrorMessage(err))); + this.apiWrapper.showErrorMessage( + localize('mkDirError', 'Error on making directory: {0}', utils.getErrorMessage(err, true))); } } @@ -213,7 +215,8 @@ export class DeleteFilesCommand extends Command { this.apiWrapper.showErrorMessage(LocalizedConstants.msgMissingNodeContext); } } catch (err) { - this.apiWrapper.showErrorMessage(localize('deleteError', 'Error deleting files {0}', utils.getErrorMessage(err))); + this.apiWrapper.showErrorMessage( + localize('deleteError', 'Error on deleting files: {0}', utils.getErrorMessage(err, true))); } } @@ -273,7 +276,8 @@ export class SaveFileCommand extends ProgressCommand { this.apiWrapper.showErrorMessage(LocalizedConstants.msgMissingNodeContext); } } catch (err) { - this.apiWrapper.showErrorMessage(localize('saveError', 'Error saving file: {0}', utils.getErrorMessage(err))); + this.apiWrapper.showErrorMessage( + localize('saveError', 'Error on saving file: {0}', utils.getErrorMessage(err, true))); } } @@ -313,7 +317,8 @@ export class PreviewFileCommand extends ProgressCommand { this.apiWrapper.showErrorMessage(LocalizedConstants.msgMissingNodeContext); } } catch (err) { - this.apiWrapper.showErrorMessage(localize('previewError', 'Error previewing file: {0}', utils.getErrorMessage(err))); + this.apiWrapper.showErrorMessage( + localize('previewError', 'Error on previewing file: {0}', utils.getErrorMessage(err, true))); } } @@ -357,7 +362,8 @@ export class CopyPathCommand extends Command { this.apiWrapper.showErrorMessage(LocalizedConstants.msgMissingNodeContext); } } catch (err) { - this.apiWrapper.showErrorMessage(localize('copyPathError', 'Error copying path: {0}', utils.getErrorMessage(err))); + this.apiWrapper.showErrorMessage( + localize('copyPathError', 'Error on copying path: {0}', utils.getErrorMessage(err, true))); } } } diff --git a/extensions/mssql/src/objectExplorerNodeProvider/hdfsProvider.ts b/extensions/mssql/src/objectExplorerNodeProvider/hdfsProvider.ts index 2cb29e36a0..5270a20fae 100644 --- a/extensions/mssql/src/objectExplorerNodeProvider/hdfsProvider.ts +++ b/extensions/mssql/src/objectExplorerNodeProvider/hdfsProvider.ts @@ -281,17 +281,14 @@ export class FileNode extends HdfsFileSourceNode implements IFileNode { public writeFileContentsToDisk(localPath: string, cancelToken?: vscode.CancellationTokenSource): Promise { return new Promise((resolve, reject) => { let readStream: fs.ReadStream = this.fileSource.createReadStream(this.hdfsPath); + readStream.on('error', (err) => { + reject(err); + }); + + let error: string | Error = undefined; let writeStream = fs.createWriteStream(localPath, { encoding: 'utf8' }); - let cancelable = new CancelableStream(cancelToken); - cancelable.on('error', (err) => { - reject(err); - }); - readStream.pipe(cancelable).pipe(writeStream); - - let error: string | Error = undefined; - writeStream.on('error', (err) => { error = err; reject(error); @@ -301,6 +298,13 @@ export class FileNode extends HdfsFileSourceNode implements IFileNode { resolve(vscode.Uri.file(localPath)); } }); + + let cancelable = new CancelableStream(cancelToken); + cancelable.on('error', (err) => { + reject(err); + }); + + readStream.pipe(cancelable).pipe(writeStream); }); } diff --git a/extensions/mssql/src/objectExplorerNodeProvider/webhdfs.ts b/extensions/mssql/src/objectExplorerNodeProvider/webhdfs.ts new file mode 100644 index 0000000000..78de659999 --- /dev/null +++ b/extensions/mssql/src/objectExplorerNodeProvider/webhdfs.ts @@ -0,0 +1,755 @@ +// This code is originally from https://github.com/harrisiirak/webhdfs +// License: https://github.com/harrisiirak/webhdfs/blob/master/LICENSE + +'use strict'; + +import * as url from 'url'; +import * as fs from 'fs'; +import * as querystring from 'querystring'; +import * as request from 'request'; +import * as BufferStreamReader from 'buffer-stream-reader'; +import * as nls from 'vscode-nls'; +import { IHdfsOptions, IRequestParams } from './fileSources'; + +const localize = nls.loadMessageBundle(); +const ErrorMessageInvalidDataStructure = + localize('webhdfs.invalidDataStructure', 'Invalid Data Structure'); + +export class WebHDFS { + private _requestParams: IRequestParams; + private _opts: IHdfsOptions; + private _url: any; + + constructor(opts: IHdfsOptions, requestParams: IRequestParams) { + if (!(this instanceof WebHDFS)) { + return new WebHDFS(opts, requestParams); + } + + let missingProps = ['host', 'port', 'path'] + .filter(p => !opts.hasOwnProperty(p) || !opts[p]); + if (missingProps && missingProps.length > 0) { + throw new Error(localize('webhdfs.missingProperties', + 'Unable to create WebHDFS client due to missing options: ${0}', missingProps.join(', '))); + } + + this._requestParams = requestParams || {}; + this._requestParams.timeout = this._requestParams.timeout || 10000; + + this._opts = opts; + this._url = { + protocol: opts.protocol || 'http', + hostname: opts.host.trim(), + port: opts.port || 80, + pathname: opts.path + }; + } + + private checkArgDefined(argName: string, argValue: any): void { + if (!argValue) { + throw new Error(localize('webhdfs.undefinedArgument', '\'${0}\' is undefined.', argName)); + } + } + + /** + * Generate WebHDFS REST API endpoint URL for given operation + * + * @param {string} operation WebHDFS operation name + * @param {string} path + * @param {object} params + * @returns {string} WebHDFS REST API endpoint URL + */ + private getOperationEndpoint(operation: string, path: string, params?: object): string { + let endpoint = this._url; + endpoint.pathname = this._opts.path + path; + let searchOpts = Object.assign( + { 'op': operation }, + this._opts.user ? { 'user.name': this._opts.user } : {}, + params || {} + ); + endpoint.search = querystring.stringify(searchOpts); + return url.format(endpoint); + } + + /** + * Gets localized status message for given status code + * + * @param {number} statusCode Http status code + * @returns {string} status message + */ + private toStatusMessage(statusCode: number): string { + let statusMessage: string = undefined; + switch (statusCode) { + case 400: statusMessage = localize('webhdfs.httpError400', 'Bad Request'); break; + case 401: statusMessage = localize('webhdfs.httpError401', 'Unauthorized'); break; + case 403: statusMessage = localize('webhdfs.httpError403', 'Forbidden'); break; + case 404: statusMessage = localize('webhdfs.httpError404', 'Not Found'); break; + case 500: statusMessage = localize('webhdfs.httpError500', 'Internal Server Error'); break; + // TODO: define more messages here + default: break; + } + return statusMessage; + } + + /** + * Gets status message from response + * + * @param {request.Response} response response object + * @param {boolean} strict If set true then RemoteException must be present in the body + * @returns {string} Error message interpreted by status code + */ + private getStatusMessage(response: request.Response): string { + if (!response) { return undefined; } + let statusMessage: string = this.toStatusMessage(response.statusCode) + || (response && response.statusMessage); + return statusMessage; + } + + /** + * Gets remote exception message from response body + * + * @param {any} responseBody response body + * @returns {string} Error message interpreted by status code + */ + private getRemoteExceptionMessage(responseBody: any): string { + if (!responseBody) { return undefined; } + if (typeof responseBody === 'string') { + try { + responseBody = JSON.parse(responseBody); + } catch { } + } + let remoteExceptionMessage: string = undefined; + if (responseBody.hasOwnProperty('RemoteException') + && responseBody.RemoteException.hasOwnProperty('message')) { + remoteExceptionMessage = responseBody.RemoteException.message; + } + return remoteExceptionMessage; + } + + /** + * Generates error message descriptive as much as possible + * + * @param {string} statusMessage status message + * @param {string} [remoteExceptionMessage] remote exception message + * @param {any} [error] error + * @returns {string} error message + */ + private getErrorMessage(statusMessage: string, remoteExceptionMessage?: string, error?: any): string { + statusMessage = statusMessage === '' ? undefined : statusMessage; + remoteExceptionMessage = remoteExceptionMessage === '' ? undefined : remoteExceptionMessage; + let messageFromError: string = error ? (error['message'] || error.toString()) : undefined; + return statusMessage && remoteExceptionMessage ? + `${statusMessage} (${remoteExceptionMessage})` : + statusMessage || remoteExceptionMessage || messageFromError || + localize('webhdfs.unknownError', 'Unknown Error'); + } + + /** + * Parse error state from response and return valid Error object + * + * @param {request.Response} response response object + * @param {any} [responseBody] response body + * @param {any} [error] error + * @returns {HdfsError} HdfsError object + */ + private parseError(response: request.Response, responseBody?: any, error?: any): HdfsError { + let statusMessage: string = this.getStatusMessage(response); + if (!responseBody && response) { + responseBody = response.body; + } + let remoteExceptionMessage: string = this.getRemoteExceptionMessage(responseBody); + let errorMessage: string = this.getErrorMessage(statusMessage, remoteExceptionMessage, error); + return new HdfsError(errorMessage, response && response.statusCode, + response && response.statusMessage, remoteExceptionMessage, error); + } + + /** + * Check if response is redirect + * + * @param {request.Response} response response object + * @returns {boolean} if response is redirect + */ + private isRedirect(response: request.Response): boolean { + return [301, 307].indexOf(response.statusCode) !== -1 && + response.headers.hasOwnProperty('location'); + } + + /** + * Check if response is successful + * + * @param {request.Response} response response object + * @returns {boolean} if response is successful + */ + private isSuccess(response: request.Response): boolean { + return [200, 201].indexOf(response.statusCode) !== -1; + } + + /** + * Check if response is error + * + * @param {request.Response} response response object + * @returns {boolean} if response is error + */ + private isError(response: request.Response): boolean { + return [400, 401, 402, 403, 404, 500].indexOf(response.statusCode) !== -1; + } + + /** + * Send a request to WebHDFS REST API + * + * @param {string} method HTTP method + * @param {string} url + * @param {object} opts Options for request + * @param {(error: HdfsError, response: request.Response) => void} callback + * @returns void + */ + private sendRequest(method: string, url: string, opts: object, + callback: (error: HdfsError, response: request.Response) => void): void { + + let requestParams = Object.assign( + { method: method, url: url, json: true }, + this._requestParams, + opts || {} + ); + + request(requestParams, (error, response, body) => { + if (!callback) { return; } + if (this.isSuccess(response)) { + callback(undefined, response); + } else if (error || this.isError(response)) { + let hdfsError = this.parseError(response, body, error); + callback(hdfsError, response); + } else { + let hdfsError = new HdfsError( + localize('webhdfs.unexpectedRedirect', 'Unexpected Redirect'), + response && response.statusCode, + response && response.statusMessage, + this.getRemoteExceptionMessage(body || response.body), + error + ); + callback(hdfsError, response); + } + }); + } + + /** + * Change file permissions + * + * @param {string} path + * @param {string} mode + * @param {(error: HdfsError) => void} callback + * @returns void + */ + public chmod(path: string, mode: string, callback: (error: HdfsError) => void): void { + this.checkArgDefined('path', path); + this.checkArgDefined('mode', mode); + + let endpoint = this.getOperationEndpoint('setpermission', path, { permission: mode }); + this.sendRequest('PUT', endpoint, undefined, (error) => { + return callback && callback(error); + }); + } + + /** + * Change file owner + * + * @param {string} path + * @param {string} userId User name + * @param {string} groupId Group name + * @param {(error: HdfsError) => void} callback + * @returns void + */ + public chown(path: string, userId: string, groupId: string, callback: (error: HdfsError) => void): void { + this.checkArgDefined('path', path); + this.checkArgDefined('userId', userId); + this.checkArgDefined('groupId', groupId); + + let endpoint = this.getOperationEndpoint('setowner', path, { + owner: userId, + group: groupId + }); + + this.sendRequest('PUT', endpoint, undefined, (error) => { + if (callback) { + callback(error); + } + }); + } + + /** + * Read directory contents + * + * @param {string} path + * @param {(error: HdfsError, files: any[]) => void)} callback + * @returns void + */ + public readdir(path: string, callback: (error: HdfsError, files: any[]) => void): void { + this.checkArgDefined('path', path); + + let endpoint = this.getOperationEndpoint('liststatus', path); + this.sendRequest('GET', endpoint, undefined, (error, response) => { + if (!callback) { return; } + + let files: any[] = []; + if (error) { + callback(error, undefined); + } else if (response.body.hasOwnProperty('FileStatuses') + && response.body.FileStatuses.hasOwnProperty('FileStatus')) { + files = response.body.FileStatuses.FileStatus; + callback(undefined, files); + } else { + callback(new HdfsError(ErrorMessageInvalidDataStructure), undefined); + } + }); + } + + /** + * Make new directory + * + * @param {string} path + * @param {string} [permission=0755] + * @param {(error: HdfsError) => void} callback + * @returns void + */ + public mkdir(path: string, permission: string = '0755', callback: (error: HdfsError) => void): void { + this.checkArgDefined('path', path); + + let endpoint = this.getOperationEndpoint('mkdirs', path, { + permission: permission + }); + + this.sendRequest('PUT', endpoint, undefined, (error) => { + if (callback) { + callback(error); + } + }); + } + + /** + * Rename path + * + * @param {string} path + * @param {string} destination + * @param {(error: HdfsError) => void} callback + * @returns void + */ + public rename(path: string, destination: string, callback: (error: HdfsError) => void): void { + this.checkArgDefined('path', path); + this.checkArgDefined('destination', destination); + + let endpoint = this.getOperationEndpoint('rename', path, { + destination: destination + }); + + this.sendRequest('PUT', endpoint, undefined, (error) => { + if (callback) { + callback(error); + } + }); + } + + /** + * Get file status for given path + * + * @param {string} path + * @param {(error: HdfsError, fileStatus: any) => void} callback + * @returns void + */ + public stat(path: string, callback: (error: HdfsError, fileStatus: any) => void): void { + this.checkArgDefined('path', path); + + let endpoint = this.getOperationEndpoint('getfilestatus', path); + this.sendRequest('GET', endpoint, undefined, (error, response) => { + if (!callback) { return; } + if (error) { + callback(error, undefined); + } else if (response.body.hasOwnProperty('FileStatus')) { + callback(undefined, response.body.FileStatus); + } else { + callback(new HdfsError(ErrorMessageInvalidDataStructure), undefined); + } + }); + } + + /** + * Check file existence + * Wraps stat method + * + * @see WebHDFS.stat + * @param {string} path + * @param {(error: HdfsError, exists: boolean) => void} callback + * @returns void + */ + public exists(path: string, callback: (error: HdfsError, exists: boolean) => void): void { + this.checkArgDefined('path', path); + + this.stat(path, (error, fileStatus) => { + let exists = !fileStatus ? false : true; + callback(error, exists); + }); + } + + /** + * Write data to the file + * + * @param {string} path + * @param {string | Buffer} data + * @param {boolean} append If set to true then append data to the file + * @param {object} opts + * @param {(error: HdfsError) => void} callback + * @returns {fs.WriteStream} + */ + public writeFile(path: string, data: string | Buffer, append: boolean, opts: object, + callback: (error: HdfsError) => void): fs.WriteStream { + this.checkArgDefined('path', path); + this.checkArgDefined('data', data); + + let error: HdfsError = null; + let localStream = new BufferStreamReader(data); + let remoteStream: fs.WriteStream = this.createWriteStream(path, !!append, opts || {}); + + // Handle events + remoteStream.once('error', (err) => { + error = err; + }); + + remoteStream.once('finish', () => { + if (callback && error) { + callback(error); + } + }); + + localStream.pipe(remoteStream); // Pipe data + return remoteStream; + } + + /** + * Append data to the file + * + * @see writeFile + * @param {string} path + * @param {string | Buffer} data + * @param {object} opts + * @param {(error: HdfsError) => void} callback + * @returns {fs.WriteStream} + */ + public appendFile(path: string, data: string | Buffer, opts: object, callback: (error: HdfsError) => void): fs.WriteStream { + return this.writeFile(path, data, true, opts, callback); + } + + /** + * Read data from the file + * + * @fires Request#data + * @fires WebHDFS#finish + * @param {path} path + * @param {(error: HdfsError, buffer: Buffer) => void} callback + * @returns void + */ + public readFile(path: string, callback: (error: HdfsError, buffer: Buffer) => void): void { + this.checkArgDefined('path', path); + + let remoteFileStream = this.createReadStream(path); + let data: any[] = []; + let error: HdfsError = undefined; + + remoteFileStream.once('error', (err) => { + error = err; + }); + + remoteFileStream.on('data', (dataChunk) => { + data.push(dataChunk); + }); + + remoteFileStream.once('finish', () => { + if (!callback) { return; } + if (!error) { + callback(undefined, Buffer.concat(data)); + } else { + callback(error, undefined); + } + }); + } + + /** + * Create writable stream for given path + * + * @fires WebHDFS#finish + * @param {string} path + * @param {boolean} [append] If set to true then append data to the file + * @param {object} [opts] + * @returns {fs.WriteStream} + * + * @example + * let hdfs = WebHDFS.createClient(); + * + * let localFileStream = hdfs.createReadStream('/path/to/local/file'); + * let remoteFileStream = hdfs.createWriteStream('/path/to/remote/file'); + * + * localFileStream.pipe(remoteFileStream); + * + * remoteFileStream.on('error', (err) => { + * // Do something with the error + * }); + * + * remoteFileStream.on('finish', () => { + * // Upload is done + * }); + */ + public createWriteStream(path: string, append?: boolean, opts?: object): fs.WriteStream { + this.checkArgDefined('path', path); + + let emitError = (instance, err) => { + const isErrorEmitted = instance.errorEmitted; + + if (!isErrorEmitted) { + instance.emit('error', err); + instance.emit('finish'); + } + + instance.errorEmitted = true; + }; + + let endpoint = this.getOperationEndpoint( + append ? 'append' : 'create', + path, + Object.assign( + { + overwrite: true, + permission: '0755' + }, + opts || {} + ) + ); + + let stream = undefined; + let canResume: boolean = true; + let params = Object.assign( + { + method: append ? 'POST' : 'PUT', + url: endpoint, + json: true, + headers: { 'content-type': 'application/octet-stream' } + }, + this._requestParams + ); + + let req = request(params, (error, response, body) => { + // Handle redirect only if there was not an error (e.g. res is defined) + if (response && this.isRedirect(response)) { + let upload = request( + Object.assign(params, { url: response.headers.location }), + (err, res, bo) => { + if (err || this.isError(res)) { + emitError(req, this.parseError(res, bo, err)); + req.end(); + } else if (res.headers.hasOwnProperty('location')) { + req.emit('finish', res.headers.location); + } else { + req.emit('finish'); + } + } + ); + canResume = true; // Enable resume + stream.pipe(upload); + stream.resume(); + } + + if (error && !response) { + // request failed, and req is not accessible in this case. + throw this.parseError(undefined, undefined, error); + } + + if (error || this.isError(response)) { + emitError(req, this.parseError(response, body, error)); + } + }); + + req.on('pipe', (src) => { + // Pause read stream + stream = src; + stream.pause(); + + // This is not an elegant solution but here we go + // Basically we don't allow pipe() method to resume reading input + // and set internal _readableState.flowing to false + canResume = false; + stream.on('resume', () => { + if (!canResume) { + stream._readableState.flowing = false; + } + }); + + // Unpipe initial request + src.unpipe(req); + req.end(); + }); + + return req; + } + + /** + * Create readable stream for given path + * + * @fires Request#data + * @fires WebHDFS#finish + * @param {string} path + * @param {object} [opts] + * @returns {fs.ReadStream} + * + * @example + * let hdfs = WebHDFS.createClient(); + * + * let remoteFileStream = hdfs.createReadStream('/path/to/remote/file'); + * + * remoteFileStream.on('error', (err) => { + * // Do something with the error + * }); + * + * remoteFileStream.on('data', (dataChunk) => { + * // Do something with the data chunk + * }); + * + * remoteFileStream.on('finish', () => { + * // Upload is done + * }); + */ + public createReadStream(path: string, opts?: object): fs.ReadStream { + this.checkArgDefined('path', path); + + let endpoint = this.getOperationEndpoint('open', path, opts); + let params = Object.assign( + { + method: 'GET', + url: endpoint, + json: true + }, + this._requestParams + ); + + let req: request.Request = request(params); + + req.on('complete', (response) => { + req.emit('finish'); + }); + + req.on('response', (response) => { + // Handle remote exceptions + // Remove all data handlers and parse error data + if (this.isError(response)) { + req.removeAllListeners('data'); + req.on('data', (data) => { + req.emit('error', this.parseError(response, data.toString())); + req.end(); + }); + } else if (this.isRedirect(response)) { + let download = request(params); + + download.on('complete', (response) => { + req.emit('finish'); + }); + + // Proxy data to original data handler + // Not the nicest way but hey + download.on('data', (dataChunk) => { + req.emit('data', dataChunk); + }); + + // Handle subrequest + download.on('response', (response) => { + if (this.isError(response)) { + download.removeAllListeners('data'); + download.on('data', (data) => { + req.emit('error', this.parseError(response, data.toString())); + req.end(); + }); + } + }); + } + + // No need to interrupt the request + // data will be automatically sent to the data handler + }); + + return req; + } + + /** + * Create symbolic link to the destination path + * + * @param {string} src + * @param {string} destination + * @param {boolean} [createParent=false] + * @param {(error: HdfsError) => void} callback + * @returns void + */ + public symlink(src: string, destination: string, createParent: boolean = false, callback: (error: HdfsError) => void): void { + this.checkArgDefined('src', src); + this.checkArgDefined('destination', destination); + + let endpoint = this.getOperationEndpoint('createsymlink', src, { + createParent: createParent, + destination: destination + }); + + this.sendRequest('PUT', endpoint, undefined, (error) => { + if (callback) { + callback(error); + } + }); + } + + /** + * Unlink path + * + * @param {string} path + * @param {boolean} [recursive=false] + * @param {(error: any) => void} callback + * @returns void + */ + public unlink(path: string, recursive: boolean = false, callback: (error: HdfsError) => void): void { + this.checkArgDefined('path', path); + + let endpoint = this.getOperationEndpoint('delete', path, { recursive: recursive }); + this.sendRequest('DELETE', endpoint, undefined, (error) => { + if (callback) { + callback(error); + } + }); + } + + /** + * @alias WebHDFS.unlink + * @param {string} path + * @param {boolean} [recursive=false] + * @param {(error: any) => void} callback + * @returns void + */ + public rmdir(path: string, recursive: boolean = false, callback: (error: HdfsError) => void): void { + this.unlink(path, recursive, callback); + } + + public static createClient(opts, requestParams): WebHDFS { + return new WebHDFS( + Object.assign( + { + host: 'localhost', + port: '50070', + path: '/webhdfs/v1' + }, + opts || {} + ), + requestParams + ); + } +} + +export class HdfsError extends Error { + constructor( + errorMessage: string, + public statusCode?: number, + public statusMessage?: string, + public remoteExceptionMessage?: string, + public internalError?: any) { + super(errorMessage); + } +} diff --git a/extensions/mssql/src/utils.ts b/extensions/mssql/src/utils.ts index 2e09975bb5..5b6a9f9da6 100644 --- a/extensions/mssql/src/utils.ts +++ b/extensions/mssql/src/utils.ts @@ -170,8 +170,22 @@ export function verifyPlatform(): Thenable { } } -export function getErrorMessage(error: Error | string): string { - return (error instanceof Error) ? error.message : error; +export function getErrorMessage(error: Error | any, removeHeader: boolean = false): string { + let errorMessage: string = (error instanceof Error) ? error.message : error.toString(); + if (removeHeader) { + errorMessage = removeErrorHeader(errorMessage); + } + return errorMessage; +} + +export function removeErrorHeader(errorMessage: string): string { + if (errorMessage && errorMessage !== '') { + let header: string = 'Error:'; + if (errorMessage.startsWith(header)) { + errorMessage = errorMessage.substring(header.length); + } + } + return errorMessage; } export function isObjectExplorerContext(object: any): object is azdata.ObjectExplorerContext {