Result handling for Kusto results (#1079)

* Revert "Fix for displaying multiple table (#1075)"

This reverts commit df0be31019.

* KustoResultsReader trims results
This commit is contained in:
Shafiq Ur Rahman
2020-09-21 10:02:23 -07:00
committed by GitHub
parent fea050448d
commit 2e6b134953
4 changed files with 82 additions and 52 deletions

View File

@@ -8,8 +8,18 @@ namespace Microsoft.Kusto.ServiceLayer.DataSource
{ {
public class DataReaderWrapper : IDataReader public class DataReaderWrapper : IDataReader
{ {
private readonly IDataReader _inner ; private IDataReader _inner ;
public DataReaderWrapper(IDataReader inner)
protected DataReaderWrapper()
{
}
protected DataReaderWrapper(IDataReader inner)
{
SetDataReader(inner);
}
protected void SetDataReader(IDataReader inner)
{ {
_inner = inner; _inner = inner;
} }

View File

@@ -1,27 +1,39 @@
using Microsoft.Kusto.ServiceLayer.QueryExecution; using System.Collections.Generic;
using System.Collections.Generic;
using System.Data; using System.Data;
namespace Microsoft.Kusto.ServiceLayer.DataSource namespace Microsoft.Kusto.ServiceLayer.DataSource
{ {
internal class KustoResultsReader : DataReaderWrapper internal class KustoResultsReader : DataReaderWrapper
{ {
public KustoResultsReader(IDataReader reader) private DataSet _resultDataSet;
: base(reader)
{
}
/// <summary> /// <summary>
/// Kusto returns atleast 4 results tables - QueryResults(sometimes more than one), QueryProperties, QueryStatus and Query Results Metadata Table. /// Kusto returns atleast 4 results tables - QueryResults(sometimes more than one), QueryProperties, QueryStatus and Query Results Metadata Table.
/// When returning query results we need to trim off the last 3 tables as we want the caller to only read results table. /// ADS just needs query results. When returning query results we need to trim off the last 3 tables.
/// </summary> /// </summary>
public KustoResultsReader(IDataReader reader)
: base()
{
// Read out all tables
List<DataTable> results = new List<DataTable>();
while (!(reader?.IsClosed ?? true))
{
DataTable dt = new DataTable();
dt.Load(reader); // This calls NextResult on the reader
results.Add(dt);
}
public void SanitizeResults(List<ResultSet> resultSets) // Trim results
if(results.Count > 3) results.RemoveRange(results.Count - 3, 3);
// Create a DataReader for the trimmed set
_resultDataSet = new DataSet();
foreach(var result in results)
{ {
if (resultSets.Count > 3) _resultDataSet.Tables.Add(result);
{ }
resultSets.RemoveRange(resultSets.Count - 3, 3);
} SetDataReader(_resultDataSet.CreateDataReader());
} }
} }
} }

View File

@@ -6,6 +6,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data; using System.Data;
using System.Data.Common; using System.Data.Common;
using System.Diagnostics;
using System.Data.SqlClient; using System.Data.SqlClient;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
@@ -15,7 +16,7 @@ using Microsoft.Kusto.ServiceLayer.QueryExecution.Contracts;
using Microsoft.Kusto.ServiceLayer.QueryExecution.DataStorage; using Microsoft.Kusto.ServiceLayer.QueryExecution.DataStorage;
using Microsoft.SqlTools.Utility; using Microsoft.SqlTools.Utility;
using System.Globalization; using System.Globalization;
using Microsoft.Kusto.ServiceLayer.DataSource; using System.Collections.ObjectModel;
namespace Microsoft.Kusto.ServiceLayer.QueryExecution namespace Microsoft.Kusto.ServiceLayer.QueryExecution
{ {
@@ -368,14 +369,6 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
} while (reader.NextResult()); } while (reader.NextResult());
KustoResultsReader kreader = reader as KustoResultsReader;
kreader.SanitizeResults(resultSets);
foreach (var resultSet in resultSets)
{
await resultSet.SendCurrentResults();
}
// If there were no messages, for whatever reason (NO COUNT set, messages // If there were no messages, for whatever reason (NO COUNT set, messages
// were emitted, records returned), output a "successful" message // were emitted, records returned), output a "successful" message
if (!messagesSent) if (!messagesSent)

View File

@@ -338,6 +338,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
// //
Validate.IsNotNull(nameof(dbDataReader), dbDataReader); Validate.IsNotNull(nameof(dbDataReader), dbDataReader);
Task availableTask = null;
try try
{ {
// Verify the request hasn't been cancelled // Verify the request hasn't been cancelled
@@ -356,6 +357,11 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
// //
hasStartedRead = true; hasStartedRead = true;
// Invoke the SendCurrentResults() asynchronously that will send the results available notification
// and also trigger the timer to send periodic updates.
//
availableTask = SendCurrentResults();
while (await dataReader.ReadAsync(cancellationToken)) while (await dataReader.ReadAsync(cancellationToken))
{ {
fileOffsets.Add(totalBytesWritten); fileOffsets.Add(totalBytesWritten);
@@ -365,9 +371,25 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
} }
finally finally
{ {
// await the completion of available notification in case it is not already done before proceeding
//
await availableTask;
// now set the flag to indicate that we are done reading. this equates to Complete flag to be marked 'True' in any future notifications. // now set the flag to indicate that we are done reading. this equates to Complete flag to be marked 'True' in any future notifications.
// //
hasCompletedRead = true; hasCompletedRead = true;
// Make a final call to SendCurrentResults() and await its completion. If the previously scheduled task already took care of latest status send then this should be a no-op
//
await SendCurrentResults();
// and finally:
// Make a call to send ResultCompletion and await its completion. This is just for backward compatibility with older protocol
//
await (ResultCompletion?.Invoke(this) ?? Task.CompletedTask);
} }
} }
@@ -560,8 +582,19 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
#endregion #endregion
#region Public Helper Methods #region Private Helper Methods
public async Task SendCurrentResults() /// <summary>
/// Sends the ResultsUpdated message if the number of rows has changed since last send.
/// </summary>
/// <param name="stateInfo"></param>
private void SendResultAvailableOrUpdated (object stateInfo = null)
{
// Make the call to send current results and synchronously wait for it to finish
//
SendCurrentResults().Wait();
}
private async Task SendCurrentResults()
{ {
try try
{ {
@@ -626,29 +659,11 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
} }
finally finally
{ {
// and finally:
// Make a call to send ResultCompletion and await its completion. This is just for backward compatibility with older protocol
//
await (ResultCompletion?.Invoke(this) ?? Task.CompletedTask);
// Release the sendResultsSemphore so the next invocation gets unblocked // Release the sendResultsSemphore so the next invocation gets unblocked
// //
sendResultsSemphore.Release(); sendResultsSemphore.Release();
} }
} }
#endregion
#region Private Helper Methods
/// <summary>
/// Sends the ResultsUpdated message if the number of rows has changed since last send.
/// </summary>
/// <param name="stateInfo"></param>
private void SendResultAvailableOrUpdated(object stateInfo = null)
{
// Make the call to send current results and synchronously wait for it to finish
//
SendCurrentResults().Wait();
}
private uint ResultsIntervalMultiplier { get; set; } = 1; private uint ResultsIntervalMultiplier { get; set; } = 1;