From 2e6b13495398925fff9517c583c63972d4e3ad5d Mon Sep 17 00:00:00 2001 From: Shafiq Ur Rahman Date: Mon, 21 Sep 2020 10:02:23 -0700 Subject: [PATCH] Result handling for Kusto results (#1079) * Revert "Fix for displaying multiple table (#1075)" This reverts commit df0be310198de1921f4acc1565533f70eb859938. * KustoResultsReader trims results --- .../DataSource/DataReaderWrapper.cs | 14 +++- .../DataSource/KustoResultsReader.cs | 34 ++++++--- .../QueryExecution/Batch.cs | 11 +-- .../QueryExecution/ResultSet.cs | 75 +++++++++++-------- 4 files changed, 82 insertions(+), 52 deletions(-) diff --git a/src/Microsoft.Kusto.ServiceLayer/DataSource/DataReaderWrapper.cs b/src/Microsoft.Kusto.ServiceLayer/DataSource/DataReaderWrapper.cs index 61caa060..3b508343 100644 --- a/src/Microsoft.Kusto.ServiceLayer/DataSource/DataReaderWrapper.cs +++ b/src/Microsoft.Kusto.ServiceLayer/DataSource/DataReaderWrapper.cs @@ -8,8 +8,18 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource { public class DataReaderWrapper : IDataReader { - private readonly IDataReader _inner ; - public DataReaderWrapper(IDataReader inner) + private IDataReader _inner ; + + protected DataReaderWrapper() + { + } + + protected DataReaderWrapper(IDataReader inner) + { + SetDataReader(inner); + } + + protected void SetDataReader(IDataReader inner) { _inner = inner; } diff --git a/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoResultsReader.cs b/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoResultsReader.cs index ec89c699..baf60fae 100644 --- a/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoResultsReader.cs +++ b/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoResultsReader.cs @@ -1,27 +1,39 @@ -using Microsoft.Kusto.ServiceLayer.QueryExecution; -using System.Collections.Generic; +using System.Collections.Generic; using System.Data; namespace Microsoft.Kusto.ServiceLayer.DataSource { internal class KustoResultsReader : DataReaderWrapper { - public KustoResultsReader(IDataReader reader) - : base(reader) - { - } + private DataSet _resultDataSet; /// /// Kusto returns atleast 4 results tables - QueryResults(sometimes more than one), QueryProperties, QueryStatus and Query Results Metadata Table. - /// When returning query results we need to trim off the last 3 tables as we want the caller to only read results table. + /// ADS just needs query results. When returning query results we need to trim off the last 3 tables. /// - - public void SanitizeResults(List resultSets) + public KustoResultsReader(IDataReader reader) + : base() { - if (resultSets.Count > 3) + // Read out all tables + List results = new List(); + while (!(reader?.IsClosed ?? true)) { - resultSets.RemoveRange(resultSets.Count - 3, 3); + DataTable dt = new DataTable(); + dt.Load(reader); // This calls NextResult on the reader + results.Add(dt); } + + // Trim results + if(results.Count > 3) results.RemoveRange(results.Count - 3, 3); + + // Create a DataReader for the trimmed set + _resultDataSet = new DataSet(); + foreach(var result in results) + { + _resultDataSet.Tables.Add(result); + } + + SetDataReader(_resultDataSet.CreateDataReader()); } } } \ No newline at end of file diff --git a/src/Microsoft.Kusto.ServiceLayer/QueryExecution/Batch.cs b/src/Microsoft.Kusto.ServiceLayer/QueryExecution/Batch.cs index e85e9a52..60fe2d19 100644 --- a/src/Microsoft.Kusto.ServiceLayer/QueryExecution/Batch.cs +++ b/src/Microsoft.Kusto.ServiceLayer/QueryExecution/Batch.cs @@ -6,6 +6,7 @@ using System; using System.Collections.Generic; using System.Data; using System.Data.Common; +using System.Diagnostics; using System.Data.SqlClient; using System.Linq; using System.Threading; @@ -15,7 +16,7 @@ using Microsoft.Kusto.ServiceLayer.QueryExecution.Contracts; using Microsoft.Kusto.ServiceLayer.QueryExecution.DataStorage; using Microsoft.SqlTools.Utility; using System.Globalization; -using Microsoft.Kusto.ServiceLayer.DataSource; +using System.Collections.ObjectModel; namespace Microsoft.Kusto.ServiceLayer.QueryExecution { @@ -368,14 +369,6 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution } while (reader.NextResult()); - KustoResultsReader kreader = reader as KustoResultsReader; - kreader.SanitizeResults(resultSets); - - foreach (var resultSet in resultSets) - { - await resultSet.SendCurrentResults(); - } - // If there were no messages, for whatever reason (NO COUNT set, messages // were emitted, records returned), output a "successful" message if (!messagesSent) diff --git a/src/Microsoft.Kusto.ServiceLayer/QueryExecution/ResultSet.cs b/src/Microsoft.Kusto.ServiceLayer/QueryExecution/ResultSet.cs index 6403c99a..bc3b0422 100644 --- a/src/Microsoft.Kusto.ServiceLayer/QueryExecution/ResultSet.cs +++ b/src/Microsoft.Kusto.ServiceLayer/QueryExecution/ResultSet.cs @@ -74,7 +74,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution /// /// Row count to use in special scenarios where we want to override the number of rows. /// - private long? rowCountOverride = null; + private long? rowCountOverride=null; /// /// The special action which applied to this result set @@ -304,7 +304,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution return Task.Factory.StartNew(() => - { + { string content; string format = null; @@ -313,12 +313,12 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution // Determine the format and get the first col/row of XML content = fileStreamReader.ReadRow(0, 0, Columns)[0].DisplayValue; - if (specialAction.ExpectYukonXMLShowPlan) + if (specialAction.ExpectYukonXMLShowPlan) { format = "xml"; } } - + return new ExecutionPlan { Format = format, @@ -338,6 +338,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution // Validate.IsNotNull(nameof(dbDataReader), dbDataReader); + Task availableTask = null; try { // Verify the request hasn't been cancelled @@ -356,6 +357,11 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution // hasStartedRead = true; + // Invoke the SendCurrentResults() asynchronously that will send the results available notification + // and also trigger the timer to send periodic updates. + // + availableTask = SendCurrentResults(); + while (await dataReader.ReadAsync(cancellationToken)) { fileOffsets.Add(totalBytesWritten); @@ -365,9 +371,25 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution } finally { + + // await the completion of available notification in case it is not already done before proceeding + // + await availableTask; + // now set the flag to indicate that we are done reading. this equates to Complete flag to be marked 'True' in any future notifications. // hasCompletedRead = true; + + + // Make a final call to SendCurrentResults() and await its completion. If the previously scheduled task already took care of latest status send then this should be a no-op + // + await SendCurrentResults(); + + + // and finally: + // Make a call to send ResultCompletion and await its completion. This is just for backward compatibility with older protocol + // + await (ResultCompletion?.Invoke(this) ?? Task.CompletedTask); } } @@ -496,7 +518,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution } } }); - + // Add exception handling to the save task Task taskWithHandling = saveAsTask.ContinueWithOnFaulted(async t => { @@ -560,8 +582,19 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution #endregion - #region Public Helper Methods - public async Task SendCurrentResults() + #region Private Helper Methods + /// + /// Sends the ResultsUpdated message if the number of rows has changed since last send. + /// + /// + private void SendResultAvailableOrUpdated (object stateInfo = null) + { + // Make the call to send current results and synchronously wait for it to finish + // + SendCurrentResults().Wait(); + } + + private async Task SendCurrentResults() { try { @@ -570,7 +603,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution // sendResultsSemphore.Wait(); - ResultSet currentResultSetSnapshot = (ResultSet)MemberwiseClone(); + ResultSet currentResultSetSnapshot = (ResultSet) MemberwiseClone(); if (LastUpdatedSummary == null) // We need to send results available message. { // Fire off results Available task and await it @@ -625,30 +658,12 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution } } finally - { - // and finally: - // Make a call to send ResultCompletion and await its completion. This is just for backward compatibility with older protocol - // - await (ResultCompletion?.Invoke(this) ?? Task.CompletedTask); - + { // Release the sendResultsSemphore so the next invocation gets unblocked // sendResultsSemphore.Release(); } } - #endregion - - #region Private Helper Methods - /// - /// Sends the ResultsUpdated message if the number of rows has changed since last send. - /// - /// - private void SendResultAvailableOrUpdated(object stateInfo = null) - { - // Make the call to send current results and synchronously wait for it to finish - // - SendCurrentResults().Wait(); - } private uint ResultsIntervalMultiplier { get; set; } = 1; @@ -681,8 +696,8 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution /// /// Determine the special action, if any, for this result set /// - private SpecialAction ProcessSpecialAction() - { + private SpecialAction ProcessSpecialAction() + { // Check if this result set is a showplan if (Columns.Length == 1 && string.Compare(Columns[0].ColumnName, YukonXmlShowPlanColumn, StringComparison.OrdinalIgnoreCase) == 0) @@ -717,7 +732,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution { throw new InvalidOperationException(SR.QueryServiceResultSetAddNoRows); } - + using (IFileStreamWriter writer = fileStreamFactory.GetWriter(outputFileName)) { // Write the row to the end of the file