Kusto Client Cleanup (#1102)

* Cleaned up KustoClient with formatting and switched logic between ExecuteCommandAsync and ExecuteQueryAsync to match their respective names.

* Refactored ExecuteControlCommandAsync to call ExecuteControlCommandAsync in KustoAdminProvider. Fixed spacing in GetClientRequestProperties

* Refactored ExecuteQueryAsync to not use Task.Run
This commit is contained in:
Justin M
2020-11-17 10:26:09 -08:00
committed by GitHub
parent 32d7a63d7c
commit 30d600e423
2 changed files with 61 additions and 41 deletions

View File

@@ -22,14 +22,14 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource
/// <summary> /// <summary>
/// Executes a query or command against a kusto cluster and returns a sequence of result row instances. /// Executes a query or command against a kusto cluster and returns a sequence of result row instances.
/// </summary> /// </summary>
Task<IEnumerable<T>> ExecuteControlCommandAsync<T>(string command, bool throwOnError, CancellationToken cancellationToken); Task ExecuteControlCommandAsync(string command, bool throwOnError, int retryCount = 1);
/// <summary> /// <summary>
/// Executes a query. /// Executes a query.
/// </summary> /// </summary>
/// <param name="query">The query.</param> /// <param name="query">The query.</param>
/// <returns>The results.</returns> /// <returns>The results.</returns>
Task<IDataReader> ExecuteQueryAsync(string query, CancellationToken cancellationToken, string databaseName = null); Task<IEnumerable<T>> ExecuteQueryAsync<T>(string query, CancellationToken cancellationToken, string databaseName = null);
/// <summary> /// <summary>
/// Executes a Kusto control command. /// Executes a Kusto control command.

View File

@@ -49,21 +49,23 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource
private GlobalState LoadSchemaState() private GlobalState LoadSchemaState()
{ {
CancellationTokenSource source = new CancellationTokenSource();
IEnumerable<ShowDatabaseSchemaResult> tableSchemas = Enumerable.Empty<ShowDatabaseSchemaResult>(); IEnumerable<ShowDatabaseSchemaResult> tableSchemas = Enumerable.Empty<ShowDatabaseSchemaResult>();
IEnumerable<ShowFunctionsResult> functionSchemas = Enumerable.Empty<ShowFunctionsResult>(); IEnumerable<ShowFunctionsResult> functionSchemas = Enumerable.Empty<ShowFunctionsResult>();
Parallel.Invoke(() => if (!string.IsNullOrWhiteSpace(DatabaseName))
{ {
tableSchemas = ExecuteControlCommandAsync<ShowDatabaseSchemaResult>( var source = new CancellationTokenSource();
$".show database {DatabaseName} schema", Parallel.Invoke(() =>
false, source.Token).Result; {
}, () => tableSchemas =
{ ExecuteQueryAsync<ShowDatabaseSchemaResult>($".show database {DatabaseName} schema", source.Token, DatabaseName)
functionSchemas = ExecuteControlCommandAsync<ShowFunctionsResult>(".show functions", false, .Result;
source.Token).Result; },
}); () =>
{
functionSchemas = ExecuteQueryAsync<ShowFunctionsResult>(".show functions", source.Token, DatabaseName).Result;
});
}
return KustoIntellisenseHelper.AddOrUpdateDatabase(tableSchemas, functionSchemas, return KustoIntellisenseHelper.AddOrUpdateDatabase(tableSchemas, functionSchemas,
GlobalState.Default, GlobalState.Default,
@@ -146,7 +148,8 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource
return kcsb; return kcsb;
} }
private ClientRequestProperties GetCLientRequestProperties(CancellationToken cancellationToken){ private ClientRequestProperties GetClientRequestProperties(CancellationToken cancellationToken)
{
var clientRequestProperties = new ClientRequestProperties var clientRequestProperties = new ClientRequestProperties
{ {
ClientRequestId = Guid.NewGuid().ToString() ClientRequestId = Guid.NewGuid().ToString()
@@ -170,18 +173,22 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource
{ {
var minimalQuery = var minimalQuery =
codeBlock.Service.GetMinimalText(MinimalTextKind.RemoveLeadingWhitespaceAndComments); codeBlock.Service.GetMinimalText(MinimalTextKind.RemoveLeadingWhitespaceAndComments);
if(!string.IsNullOrEmpty(minimalQuery)){ // Query is empty in case of comments
IDataReader origReader;
var clientRequestProperties = GetCLientRequestProperties(cancellationToken);
if(minimalQuery.StartsWith(".") && !minimalQuery.StartsWith(".show")){ if (!string.IsNullOrEmpty(minimalQuery))
{
// Query is empty in case of comments
IDataReader origReader;
var clientRequestProperties = GetClientRequestProperties(cancellationToken);
if (minimalQuery.StartsWith(".") && !minimalQuery.StartsWith(".show"))
{
origReader = _kustoAdminProvider.ExecuteControlCommand( origReader = _kustoAdminProvider.ExecuteControlCommand(
KustoQueryUtils.IsClusterLevelQuery(minimalQuery) ? "" : databaseName, KustoQueryUtils.IsClusterLevelQuery(minimalQuery) ? "" : databaseName,
minimalQuery, minimalQuery,
clientRequestProperties); clientRequestProperties);
} }
else{ else
{
origReader = _kustoQueryProvider.ExecuteQuery( origReader = _kustoQueryProvider.ExecuteQuery(
KustoQueryUtils.IsClusterLevelQuery(minimalQuery) ? "" : databaseName, KustoQueryUtils.IsClusterLevelQuery(minimalQuery) ? "" : databaseName,
minimalQuery, minimalQuery,
@@ -193,20 +200,20 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource
} }
}); });
if (numOfQueries == 0 && origReaders.Length > 0) // Covers the scenario when user tries to run comments. if (numOfQueries == 0 && origReaders.Length > 0) // Covers the scenario when user tries to run comments.
{ {
var clientRequestProperties = GetCLientRequestProperties(cancellationToken); var clientRequestProperties = GetClientRequestProperties(cancellationToken);
origReaders[0] = _kustoQueryProvider.ExecuteQuery( origReaders[0] = _kustoQueryProvider.ExecuteQuery(
KustoQueryUtils.IsClusterLevelQuery(query) ? "" : databaseName, KustoQueryUtils.IsClusterLevelQuery(query) ? "" : databaseName,
query, query,
clientRequestProperties); clientRequestProperties);
} }
return new KustoResultsReader(origReaders); return new KustoResultsReader(origReaders);
} }
catch (AggregateException exception) catch (AggregateException exception)
when (retryCount > 0 && when (retryCount > 0 &&
exception.InnerException is KustoRequestException innerException exception.InnerException is KustoRequestException innerException
&& innerException.FailureCode == 401) // Unauthorized && innerException.FailureCode == 401) // Unauthorized
{ {
RefreshAzureToken(); RefreshAzureToken();
@@ -214,23 +221,28 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource
return ExecuteQuery(query, cancellationToken, databaseName, retryCount); return ExecuteQuery(query, cancellationToken, databaseName, retryCount);
} }
} }
/// <summary> /// <summary>
/// Executes a query or command against a kusto cluster and returns a sequence of result row instances. /// Executes a query or command against a kusto cluster and returns a sequence of result row instances.
/// </summary> /// </summary>
public async Task<IEnumerable<T>> ExecuteControlCommandAsync<T>(string command, bool throwOnError, public async Task ExecuteControlCommandAsync(string command, bool throwOnError, int retryCount = 1)
CancellationToken cancellationToken)
{ {
ValidationUtils.IsArgumentNotNullOrWhiteSpace(command, nameof(command));
try try
{ {
var resultReader = await ExecuteQueryAsync(command, cancellationToken, DatabaseName); using (var adminOutput = await _kustoAdminProvider.ExecuteControlCommandAsync(DatabaseName, command))
var results = KustoDataReaderParser.ParseV1(resultReader, null); {
var tableReader = results[WellKnownDataSet.PrimaryResult].Single().TableData.CreateDataReader(); }
return new ObjectReader<T>(tableReader); }
catch (KustoRequestException exception) when (retryCount > 0 && exception.FailureCode == 401) // Unauthorized
{
RefreshAzureToken();
retryCount--;
await ExecuteControlCommandAsync(command, throwOnError, retryCount);
} }
catch (Exception) when (!throwOnError) catch (Exception) when (!throwOnError)
{ {
return null;
} }
} }
@@ -239,11 +251,19 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource
/// </summary> /// </summary>
/// <param name="query">The query.</param> /// <param name="query">The query.</param>
/// <returns>The results.</returns> /// <returns>The results.</returns>
public Task<IDataReader> ExecuteQueryAsync(string query, CancellationToken cancellationToken, public async Task<IEnumerable<T>> ExecuteQueryAsync<T>(string query, CancellationToken cancellationToken, string databaseName = null)
string databaseName = null)
{ {
var reader = ExecuteQuery(query, cancellationToken, databaseName); try
return Task.FromResult(reader); {
var resultReader = ExecuteQuery(query, cancellationToken, databaseName);
var results = KustoDataReaderParser.ParseV1(resultReader, null);
var tableReader = results[WellKnownDataSet.PrimaryResult].Single().TableData.CreateDataReader();
return await Task.FromResult(new ObjectReader<T>(tableReader));
}
catch (Exception)
{
return null;
}
} }
private void CancelQuery(string clientRequestId) private void CancelQuery(string clientRequestId)
@@ -263,7 +283,7 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource
try try
{ {
using (var adminOutput = _kustoAdminProvider.ExecuteControlCommand(command, null)) using (var adminOutput = _kustoAdminProvider.ExecuteControlCommand(command))
{ {
} }
} }