Add support for low-privileged user to run spark notebooks (#12883)

* Add support for low-privileged user to run spark notebooks

* error

* fix test
This commit is contained in:
Charles Gagnon
2020-10-13 09:29:00 -07:00
committed by GitHub
parent 10d3a6b2ba
commit 1c279675c8
19 changed files with 287 additions and 121 deletions

View File

@@ -14,7 +14,7 @@ import * as constants from '../common/constants';
import * as localizedConstants from '../common/localizedConstants';
import { JupyterServerInstallation } from './jupyterServerInstallation';
import * as utils from '../common/utils';
import { IPrompter, IQuestion, confirm } from '../prompts/question';
import { IPrompter, IQuestion, QuestionTypes } from '../prompts/question';
import { AppContext } from '../common/appContext';
import { LocalJupyterServerManager, ServerInstanceFactory } from './jupyterServerManager';
@@ -28,7 +28,6 @@ import { LocalPipPackageManageProvider } from './localPipPackageManageProvider';
import { LocalCondaPackageManageProvider } from './localCondaPackageManageProvider';
import { ManagePackagesDialogModel, ManagePackageDialogOptions } from '../dialog/managePackages/managePackagesDialogModel';
import { PyPiClient } from './pypiClient';
import { IconPathHelper } from '../common/iconHelper';
let untitledCounter = 0;
@@ -58,7 +57,6 @@ export class JupyterController {
this.extensionContext.extensionPath,
this.appContext.outputChannel);
await this._jupyterInstallation.configurePackagePaths();
IconPathHelper.setExtensionContext(this.extensionContext);
// Add command/task handlers
azdata.tasks.registerTask(constants.jupyterOpenNotebookTask, (profile: azdata.IConnectionProfile) => {
@@ -183,7 +181,7 @@ export class JupyterController {
//Confirmation message dialog
private async confirmReinstall(): Promise<boolean> {
return await this.prompter.promptSingle<boolean>(<IQuestion>{
type: confirm,
type: QuestionTypes.confirm,
message: localize('confirmReinstall', "Are you sure you want to reinstall?"),
default: true
});

View File

@@ -3,7 +3,7 @@
* Licensed under the Source EULA. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { nb, ServerInfo, connection, IConnectionProfile } from 'azdata';
import { nb, ServerInfo, connection, IConnectionProfile, credentials } from 'azdata';
import { Session, Kernel } from '@jupyterlab/services';
import * as fs from 'fs-extra';
import * as nls from 'vscode-nls';
@@ -18,6 +18,10 @@ import { Deferred } from '../common/promise';
import { JupyterServerInstallation } from './jupyterServerInstallation';
import * as bdc from 'bdc';
import { noBDCConnectionError, providerNotValidError } from '../common/localizedConstants';
import { SQL_PROVIDER, CONTROLLER_ENDPOINT, KNOX_ENDPOINT_GATEWAY, KNOX_ENDPOINT_SERVER, KNOX_ENDPOINT_PORT } from '../common/constants';
import CodeAdapter from '../prompts/adapter';
import { IQuestion, QuestionTypes } from '../prompts/question';
import { ExtensionContextHelper } from '../common/extensionContextHelper';
const configBase = {
'kernel_python_credentials': {
@@ -55,15 +59,6 @@ const configBase = {
}
};
const KNOX_ENDPOINT_SERVER = 'host';
const KNOX_ENDPOINT_PORT = 'knoxport';
const KNOX_ENDPOINT_GATEWAY = 'gateway';
const CONTROLLER_ENDPOINT = 'controller';
const SQL_PROVIDER = 'MSSQL';
const USER = 'user';
const AUTHTYPE = 'authenticationType';
const INTEGRATED_AUTH = 'integrated';
export class JupyterSessionManager implements nb.SessionManager {
private _ready: Deferred<void>;
private _isReady: boolean;
@@ -183,7 +178,11 @@ export class JupyterSession implements nb.ISession {
private _kernel: nb.IKernel;
private _messagesComplete: Deferred<void> = new Deferred<void>();
constructor(private sessionImpl: Session.ISession, private _installation: JupyterServerInstallation, skipSettingEnvironmentVars?: boolean, private _pythonEnvVarPath?: string) {
constructor(
private sessionImpl: Session.ISession,
private _installation: JupyterServerInstallation,
skipSettingEnvironmentVars?: boolean,
private _pythonEnvVarPath?: string) {
this.setEnvironmentVars(skipSettingEnvironmentVars).catch(error => {
console.error(`Unexpected exception setting Jupyter Session variables : ${error}`);
// We don't want callers to hang forever waiting - it's better to continue on even if we weren't
@@ -275,53 +274,74 @@ export class JupyterSession implements nb.ISession {
}
public async configureConnection(connectionProfile: IConnectionProfile): Promise<void> {
if (connectionProfile && connectionProfile.providerName && this.isSparkKernel(this.sessionImpl.kernel.name)) {
if (connectionProfile && connectionProfile.providerName && utils.isSparkKernel(this.sessionImpl.kernel.name)) {
// %_do_not_call_change_endpoint is a SparkMagic command that lets users change endpoint options,
// such as user/profile/host name/auth type
let credentials;
if (!this.isIntegratedAuth(connectionProfile)) {
credentials = await connection.getCredentials(connectionProfile.id);
}
let knoxUsername = connectionProfile.userName || 'root';
let knoxPassword: string = '';
//Update server info with bigdata endpoint - Unified Connection
if (connectionProfile.providerName === SQL_PROVIDER) {
const endpoints = await this.getClusterEndpoints(connectionProfile.id);
const gatewayEndpoint: utils.IEndpoint = endpoints?.find(ep => ep.serviceName.toLowerCase() === KNOX_ENDPOINT_GATEWAY);
if (!gatewayEndpoint) {
const serverInfo: ServerInfo = await connection.getServerInfo(connectionProfile.id);
if (!serverInfo?.options['isBigDataCluster']) {
throw new Error(noBDCConnectionError);
}
const endpoints = utils.getClusterEndpoints(serverInfo);
const controllerEndpoint = endpoints.find(ep => ep.name.toLowerCase() === CONTROLLER_ENDPOINT);
// root is the default username for pre-CU5 instances, so while we prefer to use the connection username
// as a default now we'll still fall back to root if it's empty for some reason. (but the calls below should
// get the actual correct value regardless)
let clusterController: bdc.IClusterController | undefined = undefined;
if (!utils.isIntegratedAuth(connectionProfile)) {
// See if the controller creds have been saved already, otherwise fall back to using
// SQL creds as a default
const credentialProvider = await credentials.getProvider('notebook.bdc.password');
const usernameKey = `notebook.bdc.username::${connectionProfile.id}`;
const savedUsername = ExtensionContextHelper.extensionContext.globalState.get<string>(usernameKey) || connectionProfile.userName;
const connectionCreds = await connection.getCredentials(connectionProfile.id);
const savedPassword = (await credentialProvider.readCredential(connectionProfile.id)).password || connectionCreds.password;
clusterController = await getClusterController(controllerEndpoint.endpoint, 'basic', savedUsername, savedPassword);
// Now that we know that the username/password are valid store them for use later on with the same connection
await credentialProvider.saveCredential(connectionProfile.id, clusterController.password);
await ExtensionContextHelper.extensionContext.globalState.update(usernameKey, clusterController.username);
knoxPassword = clusterController.password;
try {
knoxUsername = await clusterController.getKnoxUsername(clusterController.username);
} catch (err) {
knoxUsername = clusterController.username;
console.log(`Unexpected error getting Knox username for Spark kernel: ${err}`);
}
} else {
clusterController = await getClusterController(controllerEndpoint.endpoint, 'integrated');
}
let gatewayEndpoint: bdc.IEndpointModel = endpoints?.find(ep => ep.name.toLowerCase() === KNOX_ENDPOINT_GATEWAY);
if (!gatewayEndpoint) {
// User doesn't have permission to see the gateway endpoint from the DMV so we need to query the controller instead
const allEndpoints = (await clusterController.getEndPoints()).endPoints;
gatewayEndpoint = allEndpoints?.find(ep => ep.name.toLowerCase() === KNOX_ENDPOINT_GATEWAY);
if (!gatewayEndpoint) {
throw new Error(localize('notebook.couldNotFindKnoxGateway', "Could not find Knox gateway endpoint"));
}
}
let gatewayHostAndPort = utils.getHostAndPortFromEndpoint(gatewayEndpoint.endpoint);
connectionProfile.options[KNOX_ENDPOINT_SERVER] = gatewayHostAndPort.host;
connectionProfile.options[KNOX_ENDPOINT_PORT] = gatewayHostAndPort.port;
// root is the default username for pre-CU5 instances, so while we prefer to use the connection username
// as a default now we'll still fall back to root if it's empty for some reason. (but the calls below should
// get the actual correct value regardless)
connectionProfile.options[USER] = connectionProfile.userName || 'root';
if (!this.isIntegratedAuth(connectionProfile)) {
try {
const bdcApi = <bdc.IExtension>await vscode.extensions.getExtension(bdc.constants.extensionName).activate();
const controllerEndpoint = endpoints.find(ep => ep.serviceName.toLowerCase() === CONTROLLER_ENDPOINT);
const controller = bdcApi.getClusterController(controllerEndpoint.endpoint, 'basic', connectionProfile.userName, credentials.password);
connectionProfile.options[USER] = await controller.getKnoxUsername(connectionProfile.userName);
} catch (err) {
console.log(`Unexpected error getting Knox username for Spark kernel: ${err}`);
// Optimistically use the SQL login name - that's going to normally be the case after CU5
connectionProfile.options[USER] = connectionProfile.userName;
}
}
}
else {
throw new Error(providerNotValidError);
}
this.setHostAndPort(':', connectionProfile);
this.setHostAndPort(',', connectionProfile);
utils.setHostAndPort(':', connectionProfile);
utils.setHostAndPort(',', connectionProfile);
let server = vscode.Uri.parse(utils.getLivyUrl(connectionProfile.options[KNOX_ENDPOINT_SERVER], connectionProfile.options[KNOX_ENDPOINT_PORT])).toString();
let doNotCallChangeEndpointParams: string;
if (this.isIntegratedAuth(connectionProfile)) {
if (utils.isIntegratedAuth(connectionProfile)) {
doNotCallChangeEndpointParams = `%_do_not_call_change_endpoint --server=${server} --auth=Kerberos`;
} else {
doNotCallChangeEndpointParams = `%_do_not_call_change_endpoint --username=${connectionProfile.options[USER]} --password=${credentials.password} --server=${server} --auth=Basic_Access`;
doNotCallChangeEndpointParams = `%_do_not_call_change_endpoint --username=${knoxUsername} --password=${knoxPassword} --server=${server} --auth=Basic_Access`;
}
let future = this.sessionImpl.kernel.requestExecute({
code: doNotCallChangeEndpointParams
@@ -330,26 +350,6 @@ export class JupyterSession implements nb.ISession {
}
}
private isIntegratedAuth(connection: IConnectionProfile): boolean {
return connection.options[AUTHTYPE] && connection.options[AUTHTYPE].toLowerCase() === INTEGRATED_AUTH.toLowerCase();
}
private isSparkKernel(kernelName: string): boolean {
return kernelName && kernelName.toLowerCase().indexOf('spark') > -1;
}
private setHostAndPort(delimeter: string, connection: IConnectionProfile): void {
let originalHost = connection.options[KNOX_ENDPOINT_SERVER];
if (!originalHost) {
return;
}
let index = originalHost.indexOf(delimeter);
if (index > -1) {
connection.options[KNOX_ENDPOINT_SERVER] = originalHost.slice(0, index);
connection.options[KNOX_ENDPOINT_PORT] = originalHost.slice(index + 1);
}
}
private updateConfig(config: ISparkMagicConfig, creds: ICredentials, homePath: string): void {
config.kernel_python_credentials = creds;
config.kernel_scala_credentials = creds;
@@ -358,14 +358,6 @@ export class JupyterSession implements nb.ISession {
config.ignore_ssl_errors = utils.getIgnoreSslVerificationConfigSetting();
}
private async getClusterEndpoints(profileId: string): Promise<utils.IEndpoint[]> {
let serverInfo: ServerInfo = await connection.getServerInfo(profileId);
if (!serverInfo || !serverInfo.options) {
return [];
}
return utils.getClusterEndpoints(serverInfo);
}
private async setEnvironmentVars(skip: boolean = false): Promise<void> {
// The PowerShell kernel doesn't define the %cd and %set_env magics; no need to run those here then
if (!skip && this.sessionImpl?.kernel?.name !== 'powershell') {
@@ -395,6 +387,55 @@ export class JupyterSession implements nb.ISession {
}
}
async function getClusterController(controllerEndpoint: string, authType: bdc.AuthType, username?: string, password?: string): Promise<bdc.IClusterController | undefined> {
const bdcApi = <bdc.IExtension>await vscode.extensions.getExtension(bdc.constants.extensionName).activate();
const controller = bdcApi.getClusterController(
controllerEndpoint,
authType,
username,
password);
try {
await controller.getClusterConfig();
return controller;
} catch (err) {
// Initial username/password failed so prompt user for username password until either user
// cancels out or we successfully connect
console.log(`Error connecting to cluster controller: ${err}`);
let errorMessage = '';
const prompter = new CodeAdapter();
while (true) {
const newUsername = await prompter.promptSingle<string>(<IQuestion>{
type: QuestionTypes.input,
name: 'inputPrompt',
message: localize('promptBDCUsername', "{0}Please provide the username to connect to the BDC Controller:", errorMessage),
default: username
});
if (!username) {
console.log(`User cancelled out of username prompt for BDC Controller`);
break;
}
const newPassword = await prompter.promptSingle<string>(<IQuestion>{
type: QuestionTypes.password,
name: 'passwordPrompt',
message: localize('promptBDCPassword', "Please provide the password to connect to the BDC Controller"),
default: ''
});
if (!password) {
console.log(`User cancelled out of password prompt for BDC Controller`);
break;
}
const controller = bdcApi.getClusterController(controllerEndpoint, authType, newUsername, newPassword);
try {
await controller.getClusterConfig();
return controller;
} catch (err) {
errorMessage = localize('bdcConnectError', "Error: {0}. ", err.message ?? err);
}
}
throw new Error(localize('clusterControllerConnectionRequired', "A connection to the cluster controller is required to run Spark jobs"));
}
}
interface ICredentials {
'url': string;
}