Clean up some of the extensions (#8267)

* 💄

* prune unused code

* more cleanup

* remove abunch of used code
This commit is contained in:
Anthony Dresser
2019-11-08 11:44:43 -08:00
committed by GitHub
parent 738ca479e4
commit 7f7052ad42
94 changed files with 366 additions and 1158 deletions

View File

@@ -290,7 +290,7 @@ export function parseAclList(aclString: string): AclEntry[] {
* assumes the string has already been checked for validity.
* @param aclString The string representation of the ACL entry
*/
export function parseAclEntry(aclString: string): AclEntry {
function parseAclEntry(aclString: string): AclEntry {
const parts: string[] = aclString.split(':');
let i = 0;
const scope: AclEntryScope = parts.length === 4 && parts[i++] === 'default' ? AclEntryScope.default : AclEntryScope.access;

View File

@@ -15,19 +15,20 @@ import { PermissionStatus, AclEntry, parseAclList, PermissionType, parseAclPermi
import { Mount } from './mount';
import { everyoneName, ownerPostfix, owningGroupPostfix } from '../localizedConstants';
import { FileStatus, parseHdfsFileType } from './fileStatus';
import { Readable, Transform } from 'stream';
const localize = nls.loadMessageBundle();
const ErrorMessageInvalidDataStructure = localize('webhdfs.invalidDataStructure', "Invalid Data Structure");
const emitError = (instance, err) => {
const isErrorEmitted = instance.errorEmitted;
const emitError = (instance: request.Request | Transform, err: any) => {
const isErrorEmitted = (instance as any).errorEmitted;
if (!isErrorEmitted) {
instance.emit('error', err);
instance.emit('finish');
}
instance.errorEmitted = true;
(instance as any).errorEmitted = true;
};
export class WebHDFS {
@@ -41,7 +42,7 @@ export class WebHDFS {
}
let missingProps = ['host', 'port', 'path']
.filter(p => !opts.hasOwnProperty(p) || !opts[p]);
.filter((p: keyof IHdfsOptions) => !opts.hasOwnProperty(p) || !opts[p]);
if (missingProps && missingProps.length > 0) {
throw new Error(localize('webhdfs.missingProperties',
"Unable to create WebHDFS client due to missing options: ${0}", missingProps.join(', ')));
@@ -224,7 +225,7 @@ export class WebHDFS {
);
this.ensureCookie(requestParams);
// Add a wrapper to handle unauthorized requests by adding kerberos auth steps
let handler = (error, response) => {
let handler = (error: any, response: request.Response) => {
if (error && error.statusCode === 401 && this._requestParams.isKerberos) {
this.requestWithKerberosSync(requestParams, callback);
} else {
@@ -234,7 +235,7 @@ export class WebHDFS {
this.doSendRequest(requestParams, handler);
}
private ensureCookie(requestParams: { headers?: {} }) {
private ensureCookie(requestParams: { headers?: { [key: string]: string } }) {
if (this._authCookie && this._authCookie.expiryTime() > Date.now()) {
requestParams.headers = requestParams.headers || {};
requestParams.headers['cookie'] = `${this._authCookie.key}=${this._authCookie.value}`;
@@ -242,7 +243,7 @@ export class WebHDFS {
}
private doSendRequest(requestParams: any, callback: (error: HdfsError, response: any) => void): void {
request(requestParams, (error, response, body) => {
request(requestParams, (error: any, response: request.Response, body: any) => {
if (error || this.isError(response)) {
let hdfsError = this.parseError(response, body, error);
callback(hdfsError, response);
@@ -486,7 +487,7 @@ export class WebHDFS {
// Unknown type - just ignore since we don't currently support the other types
return;
}
e.getAllPermissions().forEach( sp => {
e.getAllPermissions().forEach(sp => {
targetEntry.addPermission(sp.scope, sp.permission);
});
});
@@ -727,7 +728,7 @@ export class WebHDFS {
cb();
}
});
let handleErr = (err) => {
let handleErr = (err: any) => {
replyStream.emit('error', err);
replyStream.end();
};
@@ -735,7 +736,7 @@ export class WebHDFS {
// After redirect, create valid stream to correct location
// and pipe the intermediate stream to it, unblocking the data flow
params.headers['content-type'] = 'application/octet-stream';
let upload = request(params, (err, res, bo) => {
let upload = request(params, (err: any, res: request.Response, bo: any) => {
if (err || this.isError(res)) {
emitError(replyStream, this.parseError(res, bo, err));
replyStream.end();
@@ -760,11 +761,11 @@ export class WebHDFS {
private doCreateWriteStream(params: any): fs.WriteStream {
let canResume: boolean = true;
let stream = undefined;
let req = request(params, (error, response, body) => {
let stream: Readable;
let req = request(params, (error: any, response: request.Response, body: any) => {
// Handle redirect only if there was not an error (e.g. res is defined)
if (response && this.isRedirect(response)) {
let upload = request(Object.assign(params, { url: response.headers.location }), (err, res, bo) => {
let upload = request(Object.assign(params, { url: response.headers.location }), (err: any, res: request.Response, bo: any) => {
if (err || this.isError(res)) {
emitError(req, this.parseError(res, bo, err));
req.end();
@@ -784,7 +785,7 @@ export class WebHDFS {
emitError(req, this.parseError(response, body, error));
}
});
req.on('pipe', (src) => {
req.on('pipe', (src: Readable) => {
// Pause read stream
stream = src;
stream.pause();
@@ -794,7 +795,7 @@ export class WebHDFS {
canResume = false;
stream.on('resume', () => {
if (!canResume) {
stream._readableState.flowing = false;
(stream as any)._readableState.flowing = false; // i guess we are unsafely accessing this
}
});
// Unpipe initial request
@@ -845,7 +846,7 @@ export class WebHDFS {
// Else, must add kerberos token and handle redirects
params.followRedirect = false;
let replyStream = through();
let handleErr = (err) => {
let handleErr = (err: any) => {
replyStream.emit('error', err);
replyStream.end();
};
@@ -960,7 +961,7 @@ export class WebHDFS {
this.unlink(path, recursive, callback);
}
public static createClient(opts, requestParams): WebHDFS {
public static createClient(opts: IHdfsOptions, requestParams: IRequestParams): WebHDFS {
return new WebHDFS(
Object.assign(
{