diff --git a/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoResultsReader.cs b/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoResultsReader.cs index 934ef248..ec89c699 100644 --- a/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoResultsReader.cs +++ b/src/Microsoft.Kusto.ServiceLayer/DataSource/KustoResultsReader.cs @@ -1,21 +1,27 @@ -using System.Data; +using Microsoft.Kusto.ServiceLayer.QueryExecution; +using System.Collections.Generic; +using System.Data; namespace Microsoft.Kusto.ServiceLayer.DataSource { internal class KustoResultsReader : DataReaderWrapper { - public KustoResultsReader(IDataReader reader) : base(reader) + public KustoResultsReader(IDataReader reader) + : base(reader) { } - + /// - /// Kusto returns 3 results tables - QueryResults, QueryProperties, QueryStatus. When returning query results - /// we want the caller to only read the first table. We override the NextResult function here to only return one table - /// from the IDataReader. - /// - public override bool NextResult() - { - return false; + /// Kusto returns atleast 4 results tables - QueryResults(sometimes more than one), QueryProperties, QueryStatus and Query Results Metadata Table. + /// When returning query results we need to trim off the last 3 tables as we want the caller to only read results table. + /// + + public void SanitizeResults(List resultSets) + { + if (resultSets.Count > 3) + { + resultSets.RemoveRange(resultSets.Count - 3, 3); + } } } } \ No newline at end of file diff --git a/src/Microsoft.Kusto.ServiceLayer/QueryExecution/Batch.cs b/src/Microsoft.Kusto.ServiceLayer/QueryExecution/Batch.cs index 60fe2d19..e85e9a52 100644 --- a/src/Microsoft.Kusto.ServiceLayer/QueryExecution/Batch.cs +++ b/src/Microsoft.Kusto.ServiceLayer/QueryExecution/Batch.cs @@ -6,7 +6,6 @@ using System; using System.Collections.Generic; using System.Data; using System.Data.Common; -using System.Diagnostics; using System.Data.SqlClient; using System.Linq; using System.Threading; @@ -16,7 +15,7 @@ using Microsoft.Kusto.ServiceLayer.QueryExecution.Contracts; using Microsoft.Kusto.ServiceLayer.QueryExecution.DataStorage; using Microsoft.SqlTools.Utility; using System.Globalization; -using System.Collections.ObjectModel; +using Microsoft.Kusto.ServiceLayer.DataSource; namespace Microsoft.Kusto.ServiceLayer.QueryExecution { @@ -369,6 +368,14 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution } while (reader.NextResult()); + KustoResultsReader kreader = reader as KustoResultsReader; + kreader.SanitizeResults(resultSets); + + foreach (var resultSet in resultSets) + { + await resultSet.SendCurrentResults(); + } + // If there were no messages, for whatever reason (NO COUNT set, messages // were emitted, records returned), output a "successful" message if (!messagesSent) diff --git a/src/Microsoft.Kusto.ServiceLayer/QueryExecution/ResultSet.cs b/src/Microsoft.Kusto.ServiceLayer/QueryExecution/ResultSet.cs index bc3b0422..6403c99a 100644 --- a/src/Microsoft.Kusto.ServiceLayer/QueryExecution/ResultSet.cs +++ b/src/Microsoft.Kusto.ServiceLayer/QueryExecution/ResultSet.cs @@ -74,7 +74,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution /// /// Row count to use in special scenarios where we want to override the number of rows. /// - private long? rowCountOverride=null; + private long? rowCountOverride = null; /// /// The special action which applied to this result set @@ -304,7 +304,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution return Task.Factory.StartNew(() => - { + { string content; string format = null; @@ -313,12 +313,12 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution // Determine the format and get the first col/row of XML content = fileStreamReader.ReadRow(0, 0, Columns)[0].DisplayValue; - if (specialAction.ExpectYukonXMLShowPlan) + if (specialAction.ExpectYukonXMLShowPlan) { format = "xml"; } } - + return new ExecutionPlan { Format = format, @@ -338,7 +338,6 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution // Validate.IsNotNull(nameof(dbDataReader), dbDataReader); - Task availableTask = null; try { // Verify the request hasn't been cancelled @@ -357,11 +356,6 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution // hasStartedRead = true; - // Invoke the SendCurrentResults() asynchronously that will send the results available notification - // and also trigger the timer to send periodic updates. - // - availableTask = SendCurrentResults(); - while (await dataReader.ReadAsync(cancellationToken)) { fileOffsets.Add(totalBytesWritten); @@ -371,25 +365,9 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution } finally { - - // await the completion of available notification in case it is not already done before proceeding - // - await availableTask; - // now set the flag to indicate that we are done reading. this equates to Complete flag to be marked 'True' in any future notifications. // hasCompletedRead = true; - - - // Make a final call to SendCurrentResults() and await its completion. If the previously scheduled task already took care of latest status send then this should be a no-op - // - await SendCurrentResults(); - - - // and finally: - // Make a call to send ResultCompletion and await its completion. This is just for backward compatibility with older protocol - // - await (ResultCompletion?.Invoke(this) ?? Task.CompletedTask); } } @@ -518,7 +496,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution } } }); - + // Add exception handling to the save task Task taskWithHandling = saveAsTask.ContinueWithOnFaulted(async t => { @@ -582,19 +560,8 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution #endregion - #region Private Helper Methods - /// - /// Sends the ResultsUpdated message if the number of rows has changed since last send. - /// - /// - private void SendResultAvailableOrUpdated (object stateInfo = null) - { - // Make the call to send current results and synchronously wait for it to finish - // - SendCurrentResults().Wait(); - } - - private async Task SendCurrentResults() + #region Public Helper Methods + public async Task SendCurrentResults() { try { @@ -603,7 +570,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution // sendResultsSemphore.Wait(); - ResultSet currentResultSetSnapshot = (ResultSet) MemberwiseClone(); + ResultSet currentResultSetSnapshot = (ResultSet)MemberwiseClone(); if (LastUpdatedSummary == null) // We need to send results available message. { // Fire off results Available task and await it @@ -658,12 +625,30 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution } } finally - { + { + // and finally: + // Make a call to send ResultCompletion and await its completion. This is just for backward compatibility with older protocol + // + await (ResultCompletion?.Invoke(this) ?? Task.CompletedTask); + // Release the sendResultsSemphore so the next invocation gets unblocked // sendResultsSemphore.Release(); } } + #endregion + + #region Private Helper Methods + /// + /// Sends the ResultsUpdated message if the number of rows has changed since last send. + /// + /// + private void SendResultAvailableOrUpdated(object stateInfo = null) + { + // Make the call to send current results and synchronously wait for it to finish + // + SendCurrentResults().Wait(); + } private uint ResultsIntervalMultiplier { get; set; } = 1; @@ -696,8 +681,8 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution /// /// Determine the special action, if any, for this result set /// - private SpecialAction ProcessSpecialAction() - { + private SpecialAction ProcessSpecialAction() + { // Check if this result set is a showplan if (Columns.Length == 1 && string.Compare(Columns[0].ColumnName, YukonXmlShowPlanColumn, StringComparison.OrdinalIgnoreCase) == 0) @@ -732,7 +717,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution { throw new InvalidOperationException(SR.QueryServiceResultSetAddNoRows); } - + using (IFileStreamWriter writer = fileStreamFactory.GetWriter(outputFileName)) { // Write the row to the end of the file