Fix HDFS with AD auth for browse, read file scenarios (#6840)

* Fix HDFS with AD auth for browse, read
- HDFS now fully supports expanding nodes for all levels, including using cookie for auth
- HDFS now support reading files from HDFS
- HDFS write file is broken and will be fixed (either in PR update or separate PR)
- Removed hack to use gateway-0 instead of actual DNS name now these are supported. Needed for testing

* Fix Jupyter error using new DMV with endpoints
This commit is contained in:
Kevin Cunnane
2019-08-20 18:12:38 -07:00
committed by GitHub
parent 29c5977281
commit 1f00249646
7 changed files with 316 additions and 114 deletions

View File

@@ -16,7 +16,6 @@ import * as nls from 'vscode-nls';
import * as constants from '../constants';
import { WebHDFS, HdfsError } from './webhdfs';
import * as auth from '../util/auth';
const localize = nls.loadMessageBundle();
@@ -124,10 +123,6 @@ export class FileSourceFactory {
requestParams['agent'] = agent;
}
if (requestParams.isKerberos) {
let kerberosToken = await auth.authenticateKerberos(options.host);
requestParams.headers = { Authorization: `Negotiate ${kerberosToken}` };
}
return new HdfsFileSource(WebHDFS.createClient(options, requestParams));
}

View File

@@ -6,18 +6,20 @@ import * as fs from 'fs';
import * as querystring from 'querystring';
import * as request from 'request';
import * as BufferStreamReader from 'buffer-stream-reader';
import { Cookie } from 'tough-cookie';
import * as through from 'through2';
import * as nls from 'vscode-nls';
import * as auth from '../util/auth';
import { IHdfsOptions, IRequestParams } from './fileSources';
const localize = nls.loadMessageBundle();
const ErrorMessageInvalidDataStructure =
localize('webhdfs.invalidDataStructure', 'Invalid Data Structure');
const ErrorMessageInvalidDataStructure = localize('webhdfs.invalidDataStructure', "Invalid Data Structure");
export class WebHDFS {
private _requestParams: IRequestParams;
private _opts: IHdfsOptions;
private _url: any;
private _authCookie: Cookie;
constructor(opts: IHdfsOptions, requestParams: IRequestParams) {
if (!(this instanceof WebHDFS)) {
return new WebHDFS(opts, requestParams);
@@ -27,7 +29,7 @@ export class WebHDFS {
.filter(p => !opts.hasOwnProperty(p) || !opts[p]);
if (missingProps && missingProps.length > 0) {
throw new Error(localize('webhdfs.missingProperties',
'Unable to create WebHDFS client due to missing options: ${0}', missingProps.join(', ')));
"Unable to create WebHDFS client due to missing options: ${0}", missingProps.join(', ')));
}
this._requestParams = requestParams || {};
@@ -44,7 +46,7 @@ export class WebHDFS {
private checkArgDefined(argName: string, argValue: any): void {
if (!argValue) {
throw new Error(localize('webhdfs.undefinedArgument', '\'${0}\' is undefined.', argName));
throw new Error(localize('webhdfs.undefinedArgument', "'${0}' is undefined.", argName));
}
}
@@ -75,11 +77,11 @@ export class WebHDFS {
private toStatusMessage(statusCode: number): string {
let statusMessage: string = undefined;
switch (statusCode) {
case 400: statusMessage = localize('webhdfs.httpError400', 'Bad Request'); break;
case 401: statusMessage = localize('webhdfs.httpError401', 'Unauthorized'); break;
case 403: statusMessage = localize('webhdfs.httpError403', 'Forbidden'); break;
case 404: statusMessage = localize('webhdfs.httpError404', 'Not Found'); break;
case 500: statusMessage = localize('webhdfs.httpError500', 'Internal Server Error'); break;
case 400: statusMessage = localize('webhdfs.httpError400', "Bad Request");break;
case 401: statusMessage = localize('webhdfs.httpError401', "Unauthorized"); break;
case 403: statusMessage = localize('webhdfs.httpError403', "Forbidden"); break;
case 404: statusMessage = localize('webhdfs.httpError404', "Not Found"); break;
case 500: statusMessage = localize('webhdfs.httpError500', "Internal Server Error"); break;
// TODO: define more messages here
default: break;
}
@@ -136,7 +138,7 @@ export class WebHDFS {
return statusMessage && remoteExceptionMessage ?
`${statusMessage} (${remoteExceptionMessage})` :
statusMessage || remoteExceptionMessage || messageFromError ||
localize('webhdfs.unknownError', 'Unknown Error');
localize('webhdfs.unknownError', "Unknown Error");
}
/**
@@ -196,36 +198,94 @@ export class WebHDFS {
* @param opts Options for request
* @returns void
*/
private sendRequest(method: string, url: string, opts: object,
callback: (error: HdfsError, response: request.Response) => void): void {
private sendRequest(method: string, urlValue: string, opts: object, callback: (error: HdfsError, response: request.Response) => void): void {
if (!callback) {
return;
}
let requestParams = Object.assign(
{ method: method, url: url, json: true },
{ method: method, url: urlValue, json: true },
this._requestParams,
opts || {}
);
this.ensureCookie(requestParams);
// Add a wrapper to handle unauthorized requests by adding kerberos auth steps
let handler = (error, response) => {
if (error && error.statusCode === 401 && this._requestParams.isKerberos) {
this.requestWithKerberosSync(requestParams, callback);
} else {
callback(error, response);
}
};
this.doSendRequest(requestParams, handler);
}
private ensureCookie(requestParams: { headers?: {} }) {
if (this._authCookie && this._authCookie.expiryTime() > Date.now()) {
requestParams.headers = requestParams.headers || {};
requestParams.headers['cookie'] = `${this._authCookie.key}=${this._authCookie.value}`;
}
}
private doSendRequest(requestParams: any, callback: (error: HdfsError, response: any) => void): void {
request(requestParams, (error, response, body) => {
if (!callback) { return; }
if (error || this.isError(response)) {
let hdfsError = this.parseError(response, body, error);
callback(hdfsError, response);
} else if (this.isSuccess(response)) {
}
else if (this.isSuccess(response)) {
callback(undefined, response);
} else {
let hdfsError = new HdfsError(
localize('webhdfs.unexpectedRedirect', 'Unexpected Redirect'),
response && response.statusCode,
response && response.statusMessage,
this.getRemoteExceptionMessage(body || response.body),
error
);
}
else {
let hdfsError = new HdfsError(localize('webhdfs.unexpectedRedirect', "Unexpected Redirect"), response && response.statusCode, response && response.statusMessage, this.getRemoteExceptionMessage(body || response.body), error);
callback(hdfsError, response);
}
});
}
/**
* Authenticates using kerberos as part of a request, and saves cookie if successful.
* Ideally would use request's built-in cookie functionality but this isn't working with non-public domains.
* Instead, save the cookie in this module and reuse if not expired
*/
private requestWithKerberosSync(requestParams: any, callback: (error: HdfsError, response: request.Response) => void) {
this.setKerberosAuthOnParams(requestParams).then(() => {
this.doSendRequest(requestParams, (error, response) => {
if (error) {
// Pass on the callback
callback(error, response);
}
else {
// Capture cookie for future requests
this.setAuthCookie(response);
callback(error, response);
}
});
}).catch((err) => {
callback(err, undefined);
});
}
private async setKerberosAuthOnParams(requestParams: any): Promise<void> {
let kerberosToken = await auth.authenticateKerberos(this._opts.host);
requestParams.headers = { Authorization: `Negotiate ${kerberosToken}` };
return requestParams;
}
private setAuthCookie(response: request.Response) {
try {
if (response && response.headers && response.headers['set-cookie']) {
let cookies: Cookie[];
if (response.headers['set-cookie'] instanceof Array) {
cookies = response.headers['set-cookie'].map(c => Cookie.parse(c));
}
else {
cookies = [Cookie.parse(response.headers['set-cookie'])];
}
this._authCookie = cookies[0];
}
} catch { }
}
/**
* Change file permissions
* @returns void
@@ -457,17 +517,6 @@ export class WebHDFS {
public createWriteStream(path: string, append?: boolean, opts?: object): fs.WriteStream {
this.checkArgDefined('path', path);
let emitError = (instance, err) => {
const isErrorEmitted = instance.errorEmitted;
if (!isErrorEmitted) {
instance.emit('error', err);
instance.emit('finish');
}
instance.errorEmitted = true;
};
let endpoint = this.getOperationEndpoint(
append ? 'append' : 'create',
path,
@@ -480,8 +529,6 @@ export class WebHDFS {
)
);
let stream = undefined;
let canResume: boolean = true;
let params: any = Object.assign(
{
method: append ? 'POST' : 'PUT',
@@ -493,43 +540,68 @@ export class WebHDFS {
params.headers = params.headers || {};
params.headers['content-type'] = 'application/octet-stream';
if (!this._requestParams.isKerberos) {
return this.doCreateWriteStream(params);
}
// Else, must add kerberos token and handle redirects
params.followRedirect = false;
let replyStream = through();
let handleErr = (err) => {
replyStream.emit('error', err);
replyStream.end();
};
let initRedirectedStream = () => {
let redirectedStream = this.doCreateWriteStream(params);
replyStream.pipe(redirectedStream);
};
this.requestWithRedirectAndAuth(params, initRedirectedStream, handleErr);
return <fs.WriteStream><any>replyStream;
}
private doCreateWriteStream(params: any): fs.WriteStream {
let emitError = (instance, err) => {
const isErrorEmitted = instance.errorEmitted;
if (!isErrorEmitted) {
instance.emit('error', err);
instance.emit('finish');
}
instance.errorEmitted = true;
};
let canResume: boolean = true;
let stream = undefined;
let req = request(params, (error, response, body) => {
// Handle redirect only if there was not an error (e.g. res is defined)
if (response && this.isRedirect(response)) {
let upload = request(
Object.assign(params, { url: response.headers.location }),
(err, res, bo) => {
if (err || this.isError(res)) {
emitError(req, this.parseError(res, bo, err));
req.end();
} else if (res.headers.hasOwnProperty('location')) {
req.emit('finish', res.headers.location);
} else {
req.emit('finish');
}
let upload = request(Object.assign(params, { url: response.headers.location }), (err, res, bo) => {
if (err || this.isError(res)) {
emitError(req, this.parseError(res, bo, err));
req.end();
}
);
else if (res.headers.hasOwnProperty('location')) {
req.emit('finish', res.headers.location);
}
else {
req.emit('finish');
}
});
canResume = true; // Enable resume
stream.pipe(upload);
stream.resume();
}
if (error && !response) {
// request failed, and req is not accessible in this case.
throw this.parseError(undefined, undefined, error);
}
if (error || this.isError(response)) {
emitError(req, this.parseError(response, body, error));
}
});
req.on('pipe', (src) => {
// Pause read stream
stream = src;
stream.pause();
// This is not an elegant solution but here we go
// Basically we don't allow pipe() method to resume reading input
// and set internal _readableState.flowing to false
@@ -539,13 +611,11 @@ export class WebHDFS {
stream._readableState.flowing = false;
}
});
// Unpipe initial request
src.unpipe(req);
req.end();
});
return <fs.WriteStream><any>req;
return <fs.WriteStream><any> req;
}
/**
@@ -575,7 +645,7 @@ export class WebHDFS {
this.checkArgDefined('path', path);
let endpoint = this.getOperationEndpoint('open', path, opts);
let params = Object.assign(
let params: request.OptionsWithUrl = Object.assign(
{
method: 'GET',
url: endpoint,
@@ -583,13 +653,43 @@ export class WebHDFS {
},
this._requestParams
);
if (!this._requestParams.isKerberos) {
return <fs.ReadStream><any>this.doCreateReadStream(params);
}
// Else, must add kerberos token and handle redirects
params.followRedirect = false;
let replyStream = through();
let handleErr = (err) => {
replyStream.emit('error', err);
replyStream.end();
};
let initRedirectedStream = () => {
let redirectedStream = this.doCreateReadStream(params);
redirectedStream.pipe(replyStream);
};
this.requestWithRedirectAndAuth(params, initRedirectedStream, handleErr);
return <fs.ReadStream><any>replyStream;
}
private requestWithRedirectAndAuth(params: request.OptionsWithUrl, onRedirected: () => void, handleErr: (err: any) => void) {
this.requestWithKerberosSync(params, (err, response: request.Response) => {
if (err && err.statusCode === 307 && response.headers['location']) {
// It's a redirect
params.url = response.headers['location'];
this.setKerberosAuthOnParams(params)
.then(onRedirected)
.catch(handleErr);
}
});
}
private doCreateReadStream(params: request.OptionsWithUrl): fs.ReadStream {
let req: request.Request = request(params);
req.on('complete', (response) => {
req.emit('finish');
});
req.on('response', (response) => {
// Handle remote exceptions
// Remove all data handlers and parse error data
@@ -599,19 +699,17 @@ export class WebHDFS {
req.emit('error', this.parseError(response, data.toString()));
req.end();
});
} else if (this.isRedirect(response)) {
}
else if (this.isRedirect(response)) {
let download = request(params);
download.on('complete', (response) => {
req.emit('finish');
});
// Proxy data to original data handler
// Not the nicest way but hey
download.on('data', (dataChunk) => {
req.emit('data', dataChunk);
});
// Handle subrequest
download.on('response', (response) => {
if (this.isError(response)) {
@@ -623,11 +721,9 @@ export class WebHDFS {
}
});
}
// No need to interrupt the request
// data will be automatically sent to the data handler
});
return <fs.ReadStream><any>req;
}