Add getaclstatus/setacl calls to WebHDFS API (#7378)

* Add getaclstatus/setacl calls to WebHDFS API

* Fix hygiene check
This commit is contained in:
Charles Gagnon
2019-09-27 13:45:45 -07:00
committed by GitHub
parent 00f8dcb23e
commit 63f3d9862f
4 changed files with 319 additions and 8 deletions

View File

@@ -0,0 +1,228 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the Source EULA. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
/**
* The parsed result from calling getAclStatus on the controller
*/
export interface IAclStatus {
/**
* The ACL entries defined for the object
*/
entries: AclEntry[];
/**
* The ACL entry object for the owner permissions
*/
owner: AclEntry;
/**
* The ACL entry object for the group permissions
*/
group: AclEntry;
/**
* The ACL entry object for the other permissions
*/
other: AclEntry;
/**
* The sticky bit status for the object. If true the owner/root are
* the only ones who can delete the resource or its contents (if a folder)
*/
stickyBit: boolean;
}
/**
* The type of an ACL entry. Corresponds to the first (or second if a scope is present) field of
* an ACL entry - e.g. user:bob:rwx (user) or default:group::r-- (group)
*/
export enum AclEntryType {
/**
* An ACL entry applied to a specific user.
*/
user = 'user',
/**
* An ACL entry applied to a specific group.
*/
group = 'group',
/**
* An ACL mask entry.
*/
mask = 'mask',
/**
* An ACL entry that applies to all other users that were not covered by one of the more specific ACL entry types.
*/
other = 'other'
}
/**
* The type of permission on a file - this corresponds to the field in the file status used in commands such as chmod.
* Typically this value is represented as a 3 digit octal - e.g. 740 - where the first digit is the owner, the second
* the group and the third other. @see parseAclPermissionFromOctal
*/
export enum AclPermissionType {
owner = 'owner',
group = 'group',
other = 'other'
}
export enum AclEntryScope {
/**
* An ACL entry that is inspected during permission checks to enforce permissions.
*/
access = 'access',
/**
* An ACL entry to be applied to a directory's children that do not otherwise have their own ACL defined.
*/
default = 'default'
}
/**
* The read, write and execute permissions for an ACL
*/
export class AclEntryPermission {
constructor(public read: boolean, public write: boolean, public execute: boolean) { }
/**
* Returns the string representation of the permissions in the form [r-][w-][x-].
* e.g.
* rwx
* r--
* ---
*/
public toString() {
return `${this.read ? 'r' : '-'}${this.write ? 'w' : '-'}${this.execute ? 'x' : '-'}`;
}
}
/**
* Parses a string representation of a permission into an AclPermission object. The string must consist
* of 3 characters for the read, write and execute permissions where each character is either a r/w/x or
* a -.
* e.g. The following are all valid strings
* rwx
* ---
* -w-
* @param permissionString The string representation of the permission
*/
function parseAclPermission(permissionString: string): AclEntryPermission {
permissionString = permissionString.toLowerCase();
if (!/^[r\-][w\-][x\-]$/i.test(permissionString)) {
throw new Error(`Invalid permission string ${permissionString}- must match /^[r\-][w\-][x\-]$/i`);
}
return new AclEntryPermission(permissionString[0] === 'r', permissionString[1] === 'w', permissionString[2] === 'x');
}
/**
* A single ACL Entry. This consists of up to 4 values
* scope - The scope of the entry @see AclEntryScope
* type - The type of the entry @see AclEntryType
* name - The name of the user/group. Optional.
* permission - The permission set for this ACL. @see AclPermission
*/
export class AclEntry {
constructor(
public readonly scope: AclEntryScope,
public readonly type: AclEntryType | AclPermissionType,
public readonly name: string,
public readonly permission: AclEntryPermission,
) { }
/**
* Returns the string representation of the ACL Entry in the form [SCOPE:]TYPE:NAME:PERMISSION.
* Note that SCOPE is only displayed if it's default - access is implied if there is no scope
* specified.
* The name is optional and so may be empty.
* Example strings :
* user:bob:rwx
* default:user:bob:rwx
* user::r-x
* default:group::r--
*/
toString(): string {
return `${this.scope === AclEntryScope.default ? 'default:' : ''}${this.type}:${this.name}:${this.permission.toString()}`;
}
}
/**
* Parses a complete ACL string into separate AclEntry objects for each entry. A valid string consists of multiple entries
* separated by a comma.
*
* A valid entry must match (default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})
* e.g. the following are all valid entries
* user:bob:rwx
* user::rwx
* default::bob:rwx
* group::r-x
* default:other:r--
*
* So a valid ACL string might look like this
* user:bob:rwx,user::rwx,default::bob:rwx,group::r-x,default:other:r--
* @param aclString The string representation of the ACL
*/
export function parseAcl(aclString: string): AclEntry[] {
if (!/^(default:)?(user|group|mask|other):([A-Za-z_][A-Za-z0-9._-]*)?:([rwx-]{3})?(,(default:)?(user|group|mask|other):([A-Za-z_][A-Za-z0-9._-]*)?:([rwx-]{3})?)*$/.test(aclString)) {
throw new Error(`Invalid ACL string ${aclString}. Expected to match ^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$`);
}
return aclString.split(',').map(aclEntryString => parseAclEntry(aclEntryString));
}
/**
* Parses a given string representation of an ACL Entry into an AclEntry object. This method
* assumes the string has already been checked for validity.
* @param aclString The string representation of the ACL entry
*/
export function parseAclEntry(aclString: string): AclEntry {
const parts: string[] = aclString.split(':');
let i = 0;
const scope: AclEntryScope = parts.length === 4 && parts[i++] === 'default' ? AclEntryScope.default : AclEntryScope.access;
let type: AclEntryType;
switch (parts[i++]) {
case 'user':
type = AclEntryType.user;
break;
case 'group':
type = AclEntryType.group;
break;
case 'mask':
type = AclEntryType.mask;
break;
case 'other':
type = AclEntryType.other;
break;
default:
throw new Error(`Unknown ACL Entry type ${parts[i - 1]}`);
}
const name = parts[i++];
const permission = parseAclPermission(parts[i++]);
return new AclEntry(scope, type, name, permission);
}
/**
* Parses an octal in the form ### into a set of @see AclEntryPermission. Each digit in the octal corresponds
* to a particular user type - owner, group and other respectively.
* Each digit is then expected to be a value between 0 and 7 inclusive, which is a bitwise OR the permission flags
* for the file.
* 4 - Read
* 2 - Write
* 1 - Execute
* So an octal of 730 would map to :
* - The owner with rwx permissions
* - The group with -wx permissions
* - All others with --- permissions
* @param octal The octal string to parse
*/
export function parseAclPermissionFromOctal(octal: string): { owner: AclEntryPermission, group: AclEntryPermission, other: AclEntryPermission } {
if (!octal || octal.length !== 3) {
throw new Error(`Invalid octal ${octal} - it must be a 3 digit string`);
}
const ownerPermissionDigit = parseInt(octal[0]);
const groupPermissionDigit = parseInt(octal[1]);
const otherPermissionDigit = parseInt(octal[2]);
return {
owner: new AclEntryPermission((ownerPermissionDigit & 4) === 4, (ownerPermissionDigit & 2) === 2, (ownerPermissionDigit & 1) === 1),
group: new AclEntryPermission((groupPermissionDigit & 4) === 4, (groupPermissionDigit & 2) === 2, (groupPermissionDigit & 1) === 1),
other: new AclEntryPermission((otherPermissionDigit & 4) === 4, (otherPermissionDigit & 2) === 2, (otherPermissionDigit & 1) === 1)
};
}

View File

@@ -0,0 +1,884 @@
// This code is originally from https://github.com/harrisiirak/webhdfs
// License: https://github.com/harrisiirak/webhdfs/blob/master/LICENSE
import * as url from 'url';
import * as fs from 'fs';
import * as querystring from 'querystring';
import * as request from 'request';
import * as BufferStreamReader from 'buffer-stream-reader';
import { Cookie } from 'tough-cookie';
import * as through from 'through2';
import * as nls from 'vscode-nls';
import * as auth from '../util/auth';
import { IHdfsOptions, IRequestParams } from '../objectExplorerNodeProvider/fileSources';
import { IAclStatus, AclEntry, parseAcl, AclPermissionType, parseAclPermissionFromOctal, AclEntryScope } from './aclEntry';
const localize = nls.loadMessageBundle();
const ErrorMessageInvalidDataStructure = localize('webhdfs.invalidDataStructure', "Invalid Data Structure");
const emitError = (instance, err) => {
const isErrorEmitted = instance.errorEmitted;
if (!isErrorEmitted) {
instance.emit('error', err);
instance.emit('finish');
}
instance.errorEmitted = true;
};
export class WebHDFS {
private _requestParams: IRequestParams;
private _opts: IHdfsOptions;
private _url: any;
private _authCookie: Cookie;
constructor(opts: IHdfsOptions, requestParams: IRequestParams) {
if (!(this instanceof WebHDFS)) {
return new WebHDFS(opts, requestParams);
}
let missingProps = ['host', 'port', 'path']
.filter(p => !opts.hasOwnProperty(p) || !opts[p]);
if (missingProps && missingProps.length > 0) {
throw new Error(localize('webhdfs.missingProperties',
"Unable to create WebHDFS client due to missing options: ${0}", missingProps.join(', ')));
}
this._requestParams = requestParams || {};
this._requestParams.timeout = this._requestParams.timeout || 10000;
this._opts = opts;
this._url = {
protocol: opts.protocol || 'http',
hostname: opts.host.trim(),
port: opts.port || 80,
pathname: opts.path
};
}
private checkArgDefined(argName: string, argValue: any): void {
if (!argValue) {
throw new Error(localize('webhdfs.undefinedArgument', "'${0}' is undefined.", argName));
}
}
/**
* Generate WebHDFS REST API endpoint URL for given operation
*
* @param operation WebHDFS operation name
* @returns WebHDFS REST API endpoint URL
*/
private getOperationEndpoint(operation: string, path: string, params?: object): string {
let endpoint = this._url;
endpoint.pathname = this._opts.path + path;
let searchOpts = Object.assign(
{ 'op': operation },
this._opts.user ? { 'user.name': this._opts.user } : {},
params || {}
);
endpoint.search = querystring.stringify(searchOpts);
return url.format(endpoint);
}
/**
* Gets localized status message for given status code
*
* @param statusCode Http status code
* @returns status message
*/
private toStatusMessage(statusCode: number): string {
let statusMessage: string = undefined;
switch (statusCode) {
case 400: statusMessage = localize('webhdfs.httpError400', "Bad Request"); break;
case 401: statusMessage = localize('webhdfs.httpError401', "Unauthorized"); break;
case 403: statusMessage = localize('webhdfs.httpError403', "Forbidden"); break;
case 404: statusMessage = localize('webhdfs.httpError404', "Not Found"); break;
case 500: statusMessage = localize('webhdfs.httpError500', "Internal Server Error"); break;
// TODO: define more messages here
default: break;
}
return statusMessage;
}
/**
* Gets status message from response
*
* @param response response object
* @param strict If set true then RemoteException must be present in the body
* @returns Error message interpreted by status code
*/
private getStatusMessage(response: request.Response): string {
if (!response) { return undefined; }
let statusMessage: string = this.toStatusMessage(response.statusCode)
|| (response && response.statusMessage);
return statusMessage;
}
/**
* Gets remote exception message from response body
*
* @param responseBody response body
* @returns Error message interpreted by status code
*/
private getRemoteExceptionMessage(responseBody: any): string {
if (!responseBody) { return undefined; }
if (typeof responseBody === 'string') {
try {
responseBody = JSON.parse(responseBody);
} catch { }
}
let remoteExceptionMessage: string = undefined;
if (responseBody.hasOwnProperty('RemoteException')
&& responseBody.RemoteException.hasOwnProperty('message')) {
remoteExceptionMessage = responseBody.RemoteException.message;
}
return remoteExceptionMessage;
}
/**
* Generates error message descriptive as much as possible
*
* @param statusMessage status message
* @param [remoteExceptionMessage] remote exception message
* @param [error] error
* @returns error message
*/
private getErrorMessage(statusMessage: string, remoteExceptionMessage?: string, error?: any): string {
statusMessage = statusMessage === '' ? undefined : statusMessage;
remoteExceptionMessage = remoteExceptionMessage === '' ? undefined : remoteExceptionMessage;
let messageFromError: string = error ? (error['message'] || error.toString()) : undefined;
return statusMessage && remoteExceptionMessage ?
`${statusMessage} (${remoteExceptionMessage})` :
statusMessage || remoteExceptionMessage || messageFromError ||
localize('webhdfs.unknownError', "Unknown Error");
}
/**
* Parse error state from response and return valid Error object
*
* @param response response object
* @param [responseBody] response body
* @param [error] error
* @returns HdfsError object
*/
private parseError(response: request.Response, responseBody?: any, error?: any): HdfsError {
let statusMessage: string = this.getStatusMessage(response);
if (!responseBody && response) {
responseBody = response.body;
}
let remoteExceptionMessage: string = this.getRemoteExceptionMessage(responseBody);
let errorMessage: string = this.getErrorMessage(statusMessage, remoteExceptionMessage, error);
return new HdfsError(errorMessage, response && response.statusCode,
response && response.statusMessage, remoteExceptionMessage, error);
}
/**
* Check if response is redirect
*
* @param response response object
* @returns if response is redirect
*/
private isRedirect(response: request.Response): boolean {
return [301, 307].indexOf(response.statusCode) !== -1 &&
response.headers.hasOwnProperty('location');
}
/**
* Check if response is successful
*
* @param response response object
* @returns if response is successful
*/
private isSuccess(response: request.Response): boolean {
return [200, 201].indexOf(response.statusCode) !== -1;
}
/**
* Check if response is error
*
* @param response response object
* @returns if response is error
*/
private isError(response: request.Response): boolean {
return [400, 401, 402, 403, 404, 500].indexOf(response.statusCode) !== -1;
}
/**
* Send a request to WebHDFS REST API
*
* @param method HTTP method
* @param opts Options for request
* @returns void
*/
private sendRequest(method: string, urlValue: string, opts: object, callback: (error: HdfsError, response: request.Response) => void): void {
if (!callback) {
return;
}
let requestParams = Object.assign(
{ method: method, url: urlValue, json: true },
this._requestParams,
opts || {}
);
this.ensureCookie(requestParams);
// Add a wrapper to handle unauthorized requests by adding kerberos auth steps
let handler = (error, response) => {
if (error && error.statusCode === 401 && this._requestParams.isKerberos) {
this.requestWithKerberosSync(requestParams, callback);
} else {
callback(error, response);
}
};
this.doSendRequest(requestParams, handler);
}
private ensureCookie(requestParams: { headers?: {} }) {
if (this._authCookie && this._authCookie.expiryTime() > Date.now()) {
requestParams.headers = requestParams.headers || {};
requestParams.headers['cookie'] = `${this._authCookie.key}=${this._authCookie.value}`;
}
}
private doSendRequest(requestParams: any, callback: (error: HdfsError, response: any) => void): void {
request(requestParams, (error, response, body) => {
if (error || this.isError(response)) {
let hdfsError = this.parseError(response, body, error);
callback(hdfsError, response);
}
else if (this.isSuccess(response)) {
callback(undefined, response);
}
else {
let hdfsError = new HdfsError(localize('webhdfs.unexpectedRedirect', "Unexpected Redirect"), response && response.statusCode, response && response.statusMessage, this.getRemoteExceptionMessage(body || response.body), error);
callback(hdfsError, response);
}
});
}
/**
* Authenticates using kerberos as part of a request, and saves cookie if successful.
* Ideally would use request's built-in cookie functionality but this isn't working with non-public domains.
* Instead, save the cookie in this module and reuse if not expired
*/
private requestWithKerberosSync(requestParams: any, callback: (error: HdfsError, response: request.Response) => void) {
this.setKerberosAuthOnParams(requestParams).then(() => {
this.doSendRequest(requestParams, (error, response) => {
if (error) {
// Pass on the callback
callback(error, response);
}
else {
// Capture cookie for future requests
this.setAuthCookie(response);
callback(error, response);
}
});
}).catch((err) => {
callback(err, undefined);
});
}
private async setKerberosAuthOnParams(requestParams: any): Promise<void> {
let kerberosToken = await auth.authenticateKerberos(this._opts.host);
requestParams.headers = { Authorization: `Negotiate ${kerberosToken}` };
return requestParams;
}
private setAuthCookie(response: request.Response) {
try {
if (response && response.headers && response.headers['set-cookie']) {
let cookies: Cookie[];
if (response.headers['set-cookie'] instanceof Array) {
cookies = response.headers['set-cookie'].map(c => Cookie.parse(c));
}
else {
cookies = [Cookie.parse(response.headers['set-cookie'])];
}
this._authCookie = cookies[0];
}
} catch { }
}
/**
* Change file permissions
* @returns void
*/
public chmod(path: string, mode: string, callback: (error: HdfsError) => void): void {
this.checkArgDefined('path', path);
this.checkArgDefined('mode', mode);
let endpoint = this.getOperationEndpoint('setpermission', path, { permission: mode });
this.sendRequest('PUT', endpoint, undefined, (error) => {
return callback && callback(error);
});
}
/**
* Change file owner
*
* @param userId User name
* @param groupId Group name
* @returns void
*/
public chown(path: string, userId: string, groupId: string, callback: (error: HdfsError) => void): void {
this.checkArgDefined('path', path);
this.checkArgDefined('userId', userId);
this.checkArgDefined('groupId', groupId);
let endpoint = this.getOperationEndpoint('setowner', path, {
owner: userId,
group: groupId
});
this.sendRequest('PUT', endpoint, undefined, (error) => {
if (callback) {
callback(error);
}
});
}
/**
* Read directory contents
*
* @returns void
*/
public readdir(path: string, callback: (error: HdfsError, files: any[]) => void): void {
this.checkArgDefined('path', path);
let endpoint = this.getOperationEndpoint('liststatus', path);
this.sendRequest('GET', endpoint, undefined, (error, response) => {
if (!callback) { return; }
let files: any[] = [];
if (error) {
callback(error, undefined);
} else if (response.body.hasOwnProperty('FileStatuses')
&& response.body.FileStatuses.hasOwnProperty('FileStatus')) {
files = response.body.FileStatuses.FileStatus;
callback(undefined, files);
} else {
callback(new HdfsError(ErrorMessageInvalidDataStructure), undefined);
}
});
}
/**
* Make new directory
* @returns void
*/
public mkdir(path: string, permission: string = '0755', callback: (error: HdfsError) => void): void {
this.checkArgDefined('path', path);
let endpoint = this.getOperationEndpoint('mkdirs', path, {
permission: permission
});
this.sendRequest('PUT', endpoint, undefined, (error) => {
if (callback) {
callback(error);
}
});
}
/**
* Rename path
* @returns void
*/
public rename(path: string, destination: string, callback: (error: HdfsError) => void): void {
this.checkArgDefined('path', path);
this.checkArgDefined('destination', destination);
let endpoint = this.getOperationEndpoint('rename', path, {
destination: destination
});
this.sendRequest('PUT', endpoint, undefined, (error) => {
if (callback) {
callback(error);
}
});
}
/**
* Get file status for given path
* @returns void
*/
public stat(path: string, callback: (error: HdfsError, fileStatus: any) => void): void {
this.checkArgDefined('path', path);
let endpoint = this.getOperationEndpoint('getfilestatus', path);
this.sendRequest('GET', endpoint, undefined, (error, response) => {
if (!callback) { return; }
if (error) {
callback(error, undefined);
} else if (response.body.hasOwnProperty('FileStatus')) {
callback(undefined, response.body.FileStatus);
} else {
callback(new HdfsError(ErrorMessageInvalidDataStructure), undefined);
}
});
}
/**
* Get ACL status for given path
* @param path The path to the file/folder to get the status of
* @param callback Callback to handle the response
* @returns void
*/
public getAclStatus(path: string, callback: (error: HdfsError, aclStatus: IAclStatus) => void): void {
this.checkArgDefined('path', path);
let endpoint = this.getOperationEndpoint('getaclstatus', path);
this.sendRequest('GET', endpoint, undefined, (error, response) => {
if (!callback) { return; }
if (error) {
callback(error, undefined);
} else if (response.body.hasOwnProperty('AclStatus')) {
const permissions = parseAclPermissionFromOctal(response.body.AclStatus.permission);
const aclStatus: IAclStatus = {
owner: new AclEntry(AclEntryScope.access, AclPermissionType.owner, response.body.AclStatus.owner || '', permissions.owner),
group: new AclEntry(AclEntryScope.access, AclPermissionType.group, response.body.AclStatus.group || '', permissions.group),
other: new AclEntry(AclEntryScope.access, AclPermissionType.other, response.body.AclStatus.other || '', permissions.other),
stickyBit: !!response.body.AclStatus.stickyBit,
entries: (<any[]>response.body.AclStatus.entries).map(entry => parseAcl(entry)).reduce((acc, parsedEntries) => acc.concat(parsedEntries, []))
};
callback(undefined, aclStatus);
} else {
callback(new HdfsError(ErrorMessageInvalidDataStructure), undefined);
}
});
}
/**
* Set ACL for the given path
* @param path The path to the file/folder to set the ACL on
* @param aclEntries The ACL entries to set
* @param callback Callback to handle the response
* @returns void
*/
public setAcl(path: string, aclEntries: AclEntry[], callback: (error: HdfsError) => void): void {
this.checkArgDefined('path', path);
this.checkArgDefined('aclEntries', aclEntries);
const aclSpec = aclEntries.join(',');
let endpoint = this.getOperationEndpoint('setacl', path, { aclspec: aclSpec });
this.sendRequest('PUT', endpoint, undefined, (error) => {
return callback && callback(error);
});
}
/**
* Check file existence
* Wraps stat method
*
* @see WebHDFS.stat
* @returns void
*/
public exists(path: string, callback: (error: HdfsError, exists: boolean) => void): void {
this.checkArgDefined('path', path);
this.stat(path, (error, fileStatus) => {
let exists = !fileStatus ? false : true;
callback(error, exists);
});
}
/**
* Write data to the file
*
* @param append If set to true then append data to the file
*/
public writeFile(path: string, data: string | Buffer, append: boolean, opts: object,
callback: (error: HdfsError) => void): fs.WriteStream {
this.checkArgDefined('path', path);
this.checkArgDefined('data', data);
let error: HdfsError = null;
let localStream = new BufferStreamReader(data);
let remoteStream: fs.WriteStream = this.createWriteStream(path, !!append, opts || {});
// Handle events
remoteStream.once('error', (err) => {
error = <HdfsError>err;
});
remoteStream.once('finish', () => {
if (callback && error) {
callback(error);
}
});
localStream.pipe(remoteStream); // Pipe data
return remoteStream;
}
/**
* Append data to the file
*
* @see writeFile
*/
public appendFile(path: string, data: string | Buffer, opts: object, callback: (error: HdfsError) => void): fs.WriteStream {
return this.writeFile(path, data, true, opts, callback);
}
/**
* Read data from the file
*
* @fires Request#data
* @fires WebHDFS#finish
* @returns void
*/
public readFile(path: string, callback: (error: HdfsError, buffer: Buffer) => void): void {
this.checkArgDefined('path', path);
let remoteFileStream = this.createReadStream(path);
let data: any[] = [];
let error: HdfsError = undefined;
remoteFileStream.once('error', (err) => {
error = <HdfsError>err;
});
remoteFileStream.on('data', (dataChunk) => {
data.push(dataChunk);
});
remoteFileStream.once('finish', () => {
if (!callback) { return; }
if (!error) {
callback(undefined, Buffer.concat(data));
} else {
callback(error, undefined);
}
});
}
/**
* Create writable stream for given path
*
* @fires WebHDFS#finish
* @param [append] If set to true then append data to the file
*
* @example
* let hdfs = WebHDFS.createClient();
*
* let localFileStream = hdfs.createReadStream('/path/to/local/file');
* let remoteFileStream = hdfs.createWriteStream('/path/to/remote/file');
*
* localFileStream.pipe(remoteFileStream);
*
* remoteFileStream.on('error', (err) => {
* // Do something with the error
* });
*
* remoteFileStream.on('finish', () => {
* // Upload is done
* });
*/
public createWriteStream(path: string, append?: boolean, opts?: object): fs.WriteStream {
this.checkArgDefined('path', path);
let endpoint = this.getOperationEndpoint(
append ? 'append' : 'create',
path,
Object.assign(
{
overwrite: true,
permission: '0755'
},
opts || {}
)
);
let params: any = Object.assign(
{
method: append ? 'POST' : 'PUT',
url: endpoint,
json: true,
},
this._requestParams
);
params.headers = params.headers || {};
params.headers['content-type'] = 'application/octet-stream';
if (!this._requestParams.isKerberos) {
return this.doCreateWriteStream(params);
}
// Else, must add kerberos token and handle redirects
return this.createKerberosWriteStream(params);
}
private createKerberosWriteStream(params: any): fs.WriteStream {
params.followRedirect = false;
// Create an intermediate stream that pauses until we get a positive
// response from the server
let isWaiting = true;
let firstCb: Function = undefined;
let replyStream = through(function (chunk, enc, cb) {
this.push(chunk, enc);
if (isWaiting) {
firstCb = cb;
} else {
cb();
}
});
let handleErr = (err) => {
replyStream.emit('error', err);
replyStream.end();
};
let initRedirectedStream = () => {
// After redirect, create valid stream to correct location
// and pipe the intermediate stream to it, unblocking the data flow
params.headers['content-type'] = 'application/octet-stream';
let upload = request(params, (err, res, bo) => {
if (err || this.isError(res)) {
emitError(replyStream, this.parseError(res, bo, err));
replyStream.end();
}
else if (res.headers.hasOwnProperty('location')) {
replyStream.emit('finish', res.headers.location);
}
else {
replyStream.emit('finish');
}
});
isWaiting = false;
replyStream.pipe(upload);
if (firstCb) {
firstCb();
}
};
this.requestWithRedirectAndAuth(params, initRedirectedStream, handleErr);
return <fs.WriteStream><any>replyStream;
}
private doCreateWriteStream(params: any): fs.WriteStream {
let canResume: boolean = true;
let stream = undefined;
let req = request(params, (error, response, body) => {
// Handle redirect only if there was not an error (e.g. res is defined)
if (response && this.isRedirect(response)) {
let upload = request(Object.assign(params, { url: response.headers.location }), (err, res, bo) => {
if (err || this.isError(res)) {
emitError(req, this.parseError(res, bo, err));
req.end();
}
else if (res.headers.hasOwnProperty('location')) {
req.emit('finish', res.headers.location);
}
else {
req.emit('finish');
}
});
canResume = true; // Enable resume
stream.pipe(upload);
stream.resume();
}
if (error && !response) {
// request failed, and req is not accessible in this case.
throw this.parseError(undefined, undefined, error);
}
if (error || this.isError(response)) {
emitError(req, this.parseError(response, body, error));
}
});
req.on('pipe', (src) => {
// Pause read stream
stream = src;
stream.pause();
// This is not an elegant solution but here we go
// Basically we don't allow pipe() method to resume reading input
// and set internal _readableState.flowing to false
canResume = false;
stream.on('resume', () => {
if (!canResume) {
stream._readableState.flowing = false;
}
});
// Unpipe initial request
src.unpipe(req);
req.end();
});
return <fs.WriteStream><any>req;
}
/**
* Create readable stream for given path
*
* @fires Request#data
* @fires WebHDFS#finish
*
* @example
* let hdfs = WebHDFS.createClient();
*
* let remoteFileStream = hdfs.createReadStream('/path/to/remote/file');
*
* remoteFileStream.on('error', (err) => {
* // Do something with the error
* });
*
* remoteFileStream.on('data', (dataChunk) => {
* // Do something with the data chunk
* });
*
* remoteFileStream.on('finish', () => {
* // Upload is done
* });
*/
public createReadStream(path: string, opts?: object): fs.ReadStream {
this.checkArgDefined('path', path);
let endpoint = this.getOperationEndpoint('open', path, opts);
let params: request.OptionsWithUrl = Object.assign(
{
method: 'GET',
url: endpoint,
json: true
},
this._requestParams
);
if (!this._requestParams.isKerberos) {
return <fs.ReadStream><any>this.doCreateReadStream(params);
}
// Else, must add kerberos token and handle redirects
params.followRedirect = false;
let replyStream = through();
let handleErr = (err) => {
replyStream.emit('error', err);
replyStream.end();
};
let initRedirectedStream = () => {
let redirectedStream = this.doCreateReadStream(params);
redirectedStream.pipe(replyStream);
};
this.requestWithRedirectAndAuth(params, initRedirectedStream, handleErr);
return <fs.ReadStream><any>replyStream;
}
private requestWithRedirectAndAuth(params: request.OptionsWithUrl, onRedirected: () => void, handleErr: (err: any) => void) {
this.requestWithKerberosSync(params, (err, response: request.Response) => {
if (err && err.statusCode === 307 && response.headers['location']) {
// It's a redirect
params.url = response.headers['location'];
this.setKerberosAuthOnParams(params)
.then(onRedirected)
.catch(handleErr);
} else {
handleErr(err);
}
});
}
private doCreateReadStream(params: request.OptionsWithUrl): fs.ReadStream {
let req: request.Request = request(params);
req.on('complete', (response) => {
req.emit('finish');
});
req.on('response', (response) => {
// Handle remote exceptions
// Remove all data handlers and parse error data
if (this.isError(response)) {
req.removeAllListeners('data');
req.on('data', (data) => {
req.emit('error', this.parseError(response, data.toString()));
req.end();
});
}
else if (this.isRedirect(response)) {
let download = request(params);
download.on('complete', (response) => {
req.emit('finish');
});
// Proxy data to original data handler
// Not the nicest way but hey
download.on('data', (dataChunk) => {
req.emit('data', dataChunk);
});
// Handle subrequest
download.on('response', (response) => {
if (this.isError(response)) {
download.removeAllListeners('data');
download.on('data', (data) => {
req.emit('error', this.parseError(response, data.toString()));
req.end();
});
}
});
}
// No need to interrupt the request
// data will be automatically sent to the data handler
});
return <fs.ReadStream><any>req;
}
/**
* Create symbolic link to the destination path
*
* @returns void
*/
public symlink(src: string, destination: string, createParent: boolean = false, callback: (error: HdfsError) => void): void {
this.checkArgDefined('src', src);
this.checkArgDefined('destination', destination);
let endpoint = this.getOperationEndpoint('createsymlink', src, {
createParent: createParent,
destination: destination
});
this.sendRequest('PUT', endpoint, undefined, (error) => {
if (callback) {
callback(error);
}
});
}
/**
* Unlink path
*
* @returns void
*/
public unlink(path: string, recursive: boolean = false, callback: (error: HdfsError) => void): void {
this.checkArgDefined('path', path);
let endpoint = this.getOperationEndpoint('delete', path, { recursive: recursive });
this.sendRequest('DELETE', endpoint, undefined, (error) => {
if (callback) {
callback(error);
}
});
}
/**
* @alias WebHDFS.unlink
* @returns void
*/
public rmdir(path: string, recursive: boolean = false, callback: (error: HdfsError) => void): void {
this.unlink(path, recursive, callback);
}
public static createClient(opts, requestParams): WebHDFS {
return new WebHDFS(
Object.assign(
{
host: 'localhost',
port: '50070',
path: '/webhdfs/v1'
},
opts || {}
),
requestParams
);
}
}
export class HdfsError extends Error {
constructor(
errorMessage: string,
public statusCode?: number,
public statusMessage?: string,
public remoteExceptionMessage?: string,
public internalError?: any) {
super(errorMessage);
}
}