From 30d600e423d85801d9acc509e4241af9b5fec70e Mon Sep 17 00:00:00 2001 From: Justin M <63619224+JustinMDotNet@users.noreply.github.com> Date: Tue, 17 Nov 2020 10:26:09 -0800 Subject: [PATCH] Kusto Client Cleanup (#1102) * Cleaned up KustoClient with formatting and switched logic between ExecuteCommandAsync and ExecuteQueryAsync to match their respective names. * Refactored ExecuteControlCommandAsync to call ExecuteControlCommandAsync in KustoAdminProvider. Fixed spacing in GetClientRequestProperties * Refactored ExecuteQueryAsync to not use Task.Run --- .../DataSource/IKustoClient.cs | 4 +- .../DataSource/KustoClient.cs | 98 +++++++++++-------- 2 files changed, 61 insertions(+), 41 deletions(-) diff --git a/src/Microsoft.Kusto.ServiceLayer/DataSource/IKustoClient.cs b/src/Microsoft.Kusto.ServiceLayer/DataSource/IKustoClient.cs index 6f3fc22e..de2bed90 100644 --- a/src/Microsoft.Kusto.ServiceLayer/DataSource/IKustoClient.cs +++ b/src/Microsoft.Kusto.ServiceLayer/DataSource/IKustoClient.cs @@ -22,14 +22,14 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource /// /// Executes a query or command against a kusto cluster and returns a sequence of result row instances. /// - Task> ExecuteControlCommandAsync(string command, bool throwOnError, CancellationToken cancellationToken); + Task ExecuteControlCommandAsync(string command, bool throwOnError, int retryCount = 1); /// /// Executes a query. /// /// The query. /// The results. - Task ExecuteQueryAsync(string query, CancellationToken cancellationToken, string databaseName = null); + Task> ExecuteQueryAsync(string query, CancellationToken cancellationToken, string databaseName = null); /// /// Executes a Kusto control command. diff --git a/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoClient.cs b/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoClient.cs index f8956650..8f3dae6a 100644 --- a/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoClient.cs +++ b/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoClient.cs @@ -49,21 +49,23 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource private GlobalState LoadSchemaState() { - CancellationTokenSource source = new CancellationTokenSource(); - IEnumerable tableSchemas = Enumerable.Empty(); IEnumerable functionSchemas = Enumerable.Empty(); - Parallel.Invoke(() => + if (!string.IsNullOrWhiteSpace(DatabaseName)) { - tableSchemas = ExecuteControlCommandAsync( - $".show database {DatabaseName} schema", - false, source.Token).Result; - }, () => - { - functionSchemas = ExecuteControlCommandAsync(".show functions", false, - source.Token).Result; - }); + var source = new CancellationTokenSource(); + Parallel.Invoke(() => + { + tableSchemas = + ExecuteQueryAsync($".show database {DatabaseName} schema", source.Token, DatabaseName) + .Result; + }, + () => + { + functionSchemas = ExecuteQueryAsync(".show functions", source.Token, DatabaseName).Result; + }); + } return KustoIntellisenseHelper.AddOrUpdateDatabase(tableSchemas, functionSchemas, GlobalState.Default, @@ -146,7 +148,8 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource return kcsb; } - private ClientRequestProperties GetCLientRequestProperties(CancellationToken cancellationToken){ + private ClientRequestProperties GetClientRequestProperties(CancellationToken cancellationToken) + { var clientRequestProperties = new ClientRequestProperties { ClientRequestId = Guid.NewGuid().ToString() @@ -170,18 +173,22 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource { var minimalQuery = codeBlock.Service.GetMinimalText(MinimalTextKind.RemoveLeadingWhitespaceAndComments); - - if(!string.IsNullOrEmpty(minimalQuery)){ // Query is empty in case of comments - IDataReader origReader; - var clientRequestProperties = GetCLientRequestProperties(cancellationToken); - if(minimalQuery.StartsWith(".") && !minimalQuery.StartsWith(".show")){ + if (!string.IsNullOrEmpty(minimalQuery)) + { + // Query is empty in case of comments + IDataReader origReader; + var clientRequestProperties = GetClientRequestProperties(cancellationToken); + + if (minimalQuery.StartsWith(".") && !minimalQuery.StartsWith(".show")) + { origReader = _kustoAdminProvider.ExecuteControlCommand( KustoQueryUtils.IsClusterLevelQuery(minimalQuery) ? "" : databaseName, minimalQuery, clientRequestProperties); } - else{ + else + { origReader = _kustoQueryProvider.ExecuteQuery( KustoQueryUtils.IsClusterLevelQuery(minimalQuery) ? "" : databaseName, minimalQuery, @@ -193,20 +200,20 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource } }); - if (numOfQueries == 0 && origReaders.Length > 0) // Covers the scenario when user tries to run comments. + if (numOfQueries == 0 && origReaders.Length > 0) // Covers the scenario when user tries to run comments. { - var clientRequestProperties = GetCLientRequestProperties(cancellationToken); + var clientRequestProperties = GetClientRequestProperties(cancellationToken); origReaders[0] = _kustoQueryProvider.ExecuteQuery( - KustoQueryUtils.IsClusterLevelQuery(query) ? "" : databaseName, - query, - clientRequestProperties); + KustoQueryUtils.IsClusterLevelQuery(query) ? "" : databaseName, + query, + clientRequestProperties); } - + return new KustoResultsReader(origReaders); } - catch (AggregateException exception) + catch (AggregateException exception) when (retryCount > 0 && - exception.InnerException is KustoRequestException innerException + exception.InnerException is KustoRequestException innerException && innerException.FailureCode == 401) // Unauthorized { RefreshAzureToken(); @@ -214,23 +221,28 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource return ExecuteQuery(query, cancellationToken, databaseName, retryCount); } } - + /// /// Executes a query or command against a kusto cluster and returns a sequence of result row instances. /// - public async Task> ExecuteControlCommandAsync(string command, bool throwOnError, - CancellationToken cancellationToken) + public async Task ExecuteControlCommandAsync(string command, bool throwOnError, int retryCount = 1) { + ValidationUtils.IsArgumentNotNullOrWhiteSpace(command, nameof(command)); + try { - var resultReader = await ExecuteQueryAsync(command, cancellationToken, DatabaseName); - var results = KustoDataReaderParser.ParseV1(resultReader, null); - var tableReader = results[WellKnownDataSet.PrimaryResult].Single().TableData.CreateDataReader(); - return new ObjectReader(tableReader); + using (var adminOutput = await _kustoAdminProvider.ExecuteControlCommandAsync(DatabaseName, command)) + { + } + } + catch (KustoRequestException exception) when (retryCount > 0 && exception.FailureCode == 401) // Unauthorized + { + RefreshAzureToken(); + retryCount--; + await ExecuteControlCommandAsync(command, throwOnError, retryCount); } catch (Exception) when (!throwOnError) { - return null; } } @@ -239,11 +251,19 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource /// /// The query. /// The results. - public Task ExecuteQueryAsync(string query, CancellationToken cancellationToken, - string databaseName = null) + public async Task> ExecuteQueryAsync(string query, CancellationToken cancellationToken, string databaseName = null) { - var reader = ExecuteQuery(query, cancellationToken, databaseName); - return Task.FromResult(reader); + try + { + var resultReader = ExecuteQuery(query, cancellationToken, databaseName); + var results = KustoDataReaderParser.ParseV1(resultReader, null); + var tableReader = results[WellKnownDataSet.PrimaryResult].Single().TableData.CreateDataReader(); + return await Task.FromResult(new ObjectReader(tableReader)); + } + catch (Exception) + { + return null; + } } private void CancelQuery(string clientRequestId) @@ -263,7 +283,7 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource try { - using (var adminOutput = _kustoAdminProvider.ExecuteControlCommand(command, null)) + using (var adminOutput = _kustoAdminProvider.ExecuteControlCommand(command)) { } }