3644 Kusto Azure Token Refresh (#1081)

* 3644 Created KustoClient to handle interaction with Kusto servers. Refactored KustoDataSource and moved execution logic into KustoClient.

* 3644 Added RequestSecurityTokenParams, RequestSecurityTokenResponse, and SecurityTokenRequest to Kusto. Moved intellisense functions from KustoDataSource to KustoIntellisenseHelper. Added SchemaState to readonly property on KustoDataSource. Added catch block to Batch.cs to catch KustoRequestExceptions.

* 3644 Removed unused reference from ConnectionDetails and ConnectedBindingContext. Added UpdateAzureToken function to IKustoClient, KustoClient, IDataSource, KustoDataSource, and KustoDataSource. Added dataSource.Dispose to ReliableDataSourceConnection > Close. Added RefreshAzureToken to ConnectionService to refresh azure token.

* 3644 Removed unused properties from RequestSecurityTokenParams and RequestSecurityTokenResponse

* 3644 Added default to DbColumnWrapper to UnknownTypeName when null. Moved database query logic from KustoIntellisenseHelper to KustoClient. Moved KustoIntellisenseHelper data objects out of class. Changed SchemaState to load through separate tasks.

* 3644 Changed ReRunQuery logic in Kusto Batch to flip back to false if the query fails a second time so it can be rejected

* 3644 Updated GetAutoCompleteSuggestions and GetHoverHelp with changes from main

* 3644 Added AccountId to RequestSecurityTokenParams and set value in ConnectionService. Added throw to Batch.cs to ensure exceptions that are non-Unauthorized bubble up to ADS.

* 3644 Changed KustoUnauthorizedException to take original exception as inner exception. Changed catch block to only throw KustoUnauthorizedException when FailureCode is 401

* 3644 Renamed KustoUnauthorizedException to DataSourceUnauthorizedException. Moved logic to throw exception down into KustoClient. Changed retryLogic in Batch.cs to a decrementing count

* 3644 Changed logic in Batch.cs for throwing InvalidOperationException
This commit is contained in:
Justin M
2020-09-28 13:24:23 -07:00
committed by GitHub
parent d6ff73d510
commit d061e781f4
23 changed files with 673 additions and 487 deletions

View File

@@ -0,0 +1,260 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Kusto.Cloud.Platform.Data;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Data;
using Kusto.Data.Exceptions;
using Kusto.Data.Net.Client;
using Kusto.Language;
using Kusto.Language.Editor;
using Microsoft.Data.SqlClient;
using Microsoft.Kusto.ServiceLayer.DataSource.DataSourceIntellisense;
using Microsoft.Kusto.ServiceLayer.DataSource.Exceptions;
using Microsoft.Kusto.ServiceLayer.Utility;
namespace Microsoft.Kusto.ServiceLayer.DataSource
{
public class KustoClient : IKustoClient
{
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private ICslAdminProvider _kustoAdminProvider;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private ICslQueryProvider _kustoQueryProvider;
/// <summary>
/// SchemaState used for getting intellisense info.
/// </summary>
public GlobalState SchemaState { get; private set; }
public string ClusterName { get; }
public string DatabaseName { get; private set; }
public KustoClient(string connectionString, string azureAccountToken)
{
ClusterName = GetClusterName(connectionString);
var databaseName = new SqlConnectionStringBuilder(connectionString).InitialCatalog;
Initialize(ClusterName, databaseName, azureAccountToken);
DatabaseName = string.IsNullOrWhiteSpace(databaseName) ? GetFirstDatabaseName() : databaseName;
SchemaState = LoadSchemaState();
}
private GlobalState LoadSchemaState()
{
CancellationTokenSource source = new CancellationTokenSource();
IEnumerable<ShowDatabaseSchemaResult> tableSchemas = Enumerable.Empty<ShowDatabaseSchemaResult>();
IEnumerable<ShowFunctionsResult> functionSchemas = Enumerable.Empty<ShowFunctionsResult>();
Parallel.Invoke(() =>
{
tableSchemas = ExecuteControlCommandAsync<ShowDatabaseSchemaResult>(
$".show database {DatabaseName} schema",
false, source.Token).Result;
}, () =>
{
functionSchemas = ExecuteControlCommandAsync<ShowFunctionsResult>(".show functions", false,
source.Token).Result;
});
return KustoIntellisenseHelper.AddOrUpdateDatabase(tableSchemas, functionSchemas,
GlobalState.Default,
DatabaseName, ClusterName);
}
private void Initialize(string clusterName, string databaseName, string azureAccountToken)
{
var stringBuilder = GetKustoConnectionStringBuilder(clusterName, databaseName, azureAccountToken, "", "");
_kustoQueryProvider = KustoClientFactory.CreateCslQueryProvider(stringBuilder);
_kustoAdminProvider = KustoClientFactory.CreateCslAdminProvider(stringBuilder);
}
public void UpdateAzureToken(string azureAccountToken)
{
_kustoQueryProvider.Dispose();
_kustoAdminProvider.Dispose();
Initialize(ClusterName, DatabaseName, azureAccountToken);
}
/// <summary>
/// Extracts the cluster name from the connectionstring. The string looks like the following:
/// "Data Source=clustername.kusto.windows.net;User ID=;Password=;Pooling=False;Application Name=azdata-GeneralConnection"
/// <summary>
/// <param name="connectionString">A connection string coming over the Data management protocol</param>
private string GetClusterName(string connectionString)
{
var csb = new SqlConnectionStringBuilder(connectionString);
// If there is no https:// prefix, add it
Uri uri;
if ((Uri.TryCreate(csb.DataSource, UriKind.Absolute, out uri) ||
Uri.TryCreate("https://" + csb.DataSource, UriKind.Absolute, out uri)) &&
(uri.Scheme == Uri.UriSchemeHttp || uri.Scheme == Uri.UriSchemeHttps))
{
return uri.AbsoluteUri;
}
throw new ArgumentException("Expected a URL of the form clustername.kusto.windows.net");
}
private KustoConnectionStringBuilder GetKustoConnectionStringBuilder(string clusterName, string databaseName,
string userToken, string applicationClientId, string applicationKey)
{
ValidationUtils.IsNotNull(clusterName, nameof(clusterName));
ValidationUtils.IsTrue<ArgumentException>(
!string.IsNullOrWhiteSpace(userToken)
|| (!string.IsNullOrWhiteSpace(applicationClientId) && !string.IsNullOrWhiteSpace(applicationKey)),
$"the Kusto authentication is not specified - either set {nameof(userToken)}, or set {nameof(applicationClientId)} and {nameof(applicationKey)}");
var kcsb = new KustoConnectionStringBuilder
{
DataSource = clusterName,
// Perform federated auth based on the AAD user token, or based on the AAD application client id and key.
FederatedSecurity = true
};
if (!string.IsNullOrWhiteSpace(databaseName))
{
kcsb.InitialCatalog = databaseName;
}
if (!string.IsNullOrWhiteSpace(userToken))
{
kcsb.UserToken = userToken;
}
if (!string.IsNullOrWhiteSpace(applicationClientId))
{
kcsb.ApplicationClientId = applicationClientId;
}
if (!string.IsNullOrWhiteSpace(applicationKey))
{
kcsb.ApplicationKey = applicationKey;
}
return kcsb;
}
/// <summary>
/// Extracts the database name from the connectionString if it exists
/// otherwise it takes the first database name from the server
/// </summary>
/// <param name="connectionString"></param>
/// <returns>Database Name</returns>
private string GetFirstDatabaseName()
{
var source = new CancellationTokenSource();
string query = ".show databases | project DatabaseName";
using (var reader = ExecuteQuery(query, source.Token))
{
var rows = reader.ToEnumerable();
var row = rows?.FirstOrDefault();
return row?[0].ToString() ?? string.Empty;
}
}
public IDataReader ExecuteQuery(string query, CancellationToken cancellationToken, string databaseName = null)
{
ValidationUtils.IsArgumentNotNullOrWhiteSpace(query, nameof(query));
var clientRequestProperties = new ClientRequestProperties
{
ClientRequestId = Guid.NewGuid().ToString()
};
clientRequestProperties.SetOption(ClientRequestProperties.OptionNoTruncation, true);
cancellationToken.Register(() => CancelQuery(clientRequestProperties.ClientRequestId));
var kustoCodeService = new KustoCodeService(query);
var minimalQuery = kustoCodeService.GetMinimalText(MinimalTextKind.RemoveLeadingWhitespaceAndComments);
try
{
IDataReader origReader = _kustoQueryProvider.ExecuteQuery(
KustoQueryUtils.IsClusterLevelQuery(minimalQuery) ? "" : databaseName,
minimalQuery,
clientRequestProperties);
return new KustoResultsReader(origReader);
}
catch (KustoRequestException exception) when (exception.FailureCode == 401) // Unauthorized
{
throw new DataSourceUnauthorizedException(exception);
}
}
/// <summary>
/// Executes a query or command against a kusto cluster and returns a sequence of result row instances.
/// </summary>
public async Task<IEnumerable<T>> ExecuteControlCommandAsync<T>(string command, bool throwOnError,
CancellationToken cancellationToken)
{
try
{
var resultReader = await ExecuteQueryAsync(command, cancellationToken, DatabaseName);
var results = KustoDataReaderParser.ParseV1(resultReader, null);
var tableReader = results[WellKnownDataSet.PrimaryResult].Single().TableData.CreateDataReader();
return new ObjectReader<T>(tableReader);
}
catch (DataSourceUnauthorizedException)
{
throw;
}
catch (Exception) when (!throwOnError)
{
return null;
}
}
/// <summary>
/// Executes a query.
/// </summary>
/// <param name="query">The query.</param>
/// <returns>The results.</returns>
public Task<IDataReader> ExecuteQueryAsync(string query, CancellationToken cancellationToken,
string databaseName = null)
{
var reader = ExecuteQuery(query, cancellationToken, databaseName);
return Task.FromResult(reader);
}
private void CancelQuery(string clientRequestId)
{
var query = $".cancel query \"{clientRequestId}\"";
ExecuteControlCommand(query);
}
/// <summary>
/// Executes a Kusto control command.
/// </summary>
/// <param name="command">The command.</param>
public void ExecuteControlCommand(string command)
{
ValidationUtils.IsArgumentNotNullOrWhiteSpace(command, nameof(command));
using (var adminOutput = _kustoAdminProvider.ExecuteControlCommand(command, null))
{
}
}
public void UpdateDatabase(string databaseName)
{
DatabaseName = databaseName;
SchemaState = LoadSchemaState();
}
public void Dispose()
{
_kustoQueryProvider.Dispose();
_kustoAdminProvider.Dispose();
}
}
}