mirror of
https://github.com/ckaczor/sqltoolsservice.git
synced 2026-02-16 10:58:30 -05:00
Merge branch 'main' of https://github.com/Microsoft/sqltoolsservice into main
This commit is contained in:
@@ -1,21 +1,27 @@
|
|||||||
using System.Data;
|
using Microsoft.Kusto.ServiceLayer.QueryExecution;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Data;
|
||||||
|
|
||||||
namespace Microsoft.Kusto.ServiceLayer.DataSource
|
namespace Microsoft.Kusto.ServiceLayer.DataSource
|
||||||
{
|
{
|
||||||
internal class KustoResultsReader : DataReaderWrapper
|
internal class KustoResultsReader : DataReaderWrapper
|
||||||
{
|
{
|
||||||
public KustoResultsReader(IDataReader reader) : base(reader)
|
public KustoResultsReader(IDataReader reader)
|
||||||
|
: base(reader)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Kusto returns 3 results tables - QueryResults, QueryProperties, QueryStatus. When returning query results
|
/// Kusto returns atleast 4 results tables - QueryResults(sometimes more than one), QueryProperties, QueryStatus and Query Results Metadata Table.
|
||||||
/// we want the caller to only read the first table. We override the NextResult function here to only return one table
|
/// When returning query results we need to trim off the last 3 tables as we want the caller to only read results table.
|
||||||
/// from the IDataReader.
|
/// </summary>
|
||||||
/// </summary>
|
|
||||||
public override bool NextResult()
|
public void SanitizeResults(List<ResultSet> resultSets)
|
||||||
{
|
{
|
||||||
return false;
|
if (resultSets.Count > 3)
|
||||||
|
{
|
||||||
|
resultSets.RemoveRange(resultSets.Count - 3, 3);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -6,7 +6,6 @@ using System;
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Data;
|
using System.Data;
|
||||||
using System.Data.Common;
|
using System.Data.Common;
|
||||||
using System.Diagnostics;
|
|
||||||
using System.Data.SqlClient;
|
using System.Data.SqlClient;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
@@ -16,7 +15,7 @@ using Microsoft.Kusto.ServiceLayer.QueryExecution.Contracts;
|
|||||||
using Microsoft.Kusto.ServiceLayer.QueryExecution.DataStorage;
|
using Microsoft.Kusto.ServiceLayer.QueryExecution.DataStorage;
|
||||||
using Microsoft.SqlTools.Utility;
|
using Microsoft.SqlTools.Utility;
|
||||||
using System.Globalization;
|
using System.Globalization;
|
||||||
using System.Collections.ObjectModel;
|
using Microsoft.Kusto.ServiceLayer.DataSource;
|
||||||
|
|
||||||
namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
||||||
{
|
{
|
||||||
@@ -369,6 +368,14 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
|
|
||||||
} while (reader.NextResult());
|
} while (reader.NextResult());
|
||||||
|
|
||||||
|
KustoResultsReader kreader = reader as KustoResultsReader;
|
||||||
|
kreader.SanitizeResults(resultSets);
|
||||||
|
|
||||||
|
foreach (var resultSet in resultSets)
|
||||||
|
{
|
||||||
|
await resultSet.SendCurrentResults();
|
||||||
|
}
|
||||||
|
|
||||||
// If there were no messages, for whatever reason (NO COUNT set, messages
|
// If there were no messages, for whatever reason (NO COUNT set, messages
|
||||||
// were emitted, records returned), output a "successful" message
|
// were emitted, records returned), output a "successful" message
|
||||||
if (!messagesSent)
|
if (!messagesSent)
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Row count to use in special scenarios where we want to override the number of rows.
|
/// Row count to use in special scenarios where we want to override the number of rows.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private long? rowCountOverride=null;
|
private long? rowCountOverride = null;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The special action which applied to this result set
|
/// The special action which applied to this result set
|
||||||
@@ -304,7 +304,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
|
|
||||||
|
|
||||||
return Task.Factory.StartNew(() =>
|
return Task.Factory.StartNew(() =>
|
||||||
{
|
{
|
||||||
string content;
|
string content;
|
||||||
string format = null;
|
string format = null;
|
||||||
|
|
||||||
@@ -313,12 +313,12 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
// Determine the format and get the first col/row of XML
|
// Determine the format and get the first col/row of XML
|
||||||
content = fileStreamReader.ReadRow(0, 0, Columns)[0].DisplayValue;
|
content = fileStreamReader.ReadRow(0, 0, Columns)[0].DisplayValue;
|
||||||
|
|
||||||
if (specialAction.ExpectYukonXMLShowPlan)
|
if (specialAction.ExpectYukonXMLShowPlan)
|
||||||
{
|
{
|
||||||
format = "xml";
|
format = "xml";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new ExecutionPlan
|
return new ExecutionPlan
|
||||||
{
|
{
|
||||||
Format = format,
|
Format = format,
|
||||||
@@ -338,7 +338,6 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
//
|
//
|
||||||
Validate.IsNotNull(nameof(dbDataReader), dbDataReader);
|
Validate.IsNotNull(nameof(dbDataReader), dbDataReader);
|
||||||
|
|
||||||
Task availableTask = null;
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Verify the request hasn't been cancelled
|
// Verify the request hasn't been cancelled
|
||||||
@@ -357,11 +356,6 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
//
|
//
|
||||||
hasStartedRead = true;
|
hasStartedRead = true;
|
||||||
|
|
||||||
// Invoke the SendCurrentResults() asynchronously that will send the results available notification
|
|
||||||
// and also trigger the timer to send periodic updates.
|
|
||||||
//
|
|
||||||
availableTask = SendCurrentResults();
|
|
||||||
|
|
||||||
while (await dataReader.ReadAsync(cancellationToken))
|
while (await dataReader.ReadAsync(cancellationToken))
|
||||||
{
|
{
|
||||||
fileOffsets.Add(totalBytesWritten);
|
fileOffsets.Add(totalBytesWritten);
|
||||||
@@ -371,25 +365,9 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
|
||||||
// await the completion of available notification in case it is not already done before proceeding
|
|
||||||
//
|
|
||||||
await availableTask;
|
|
||||||
|
|
||||||
// now set the flag to indicate that we are done reading. this equates to Complete flag to be marked 'True' in any future notifications.
|
// now set the flag to indicate that we are done reading. this equates to Complete flag to be marked 'True' in any future notifications.
|
||||||
//
|
//
|
||||||
hasCompletedRead = true;
|
hasCompletedRead = true;
|
||||||
|
|
||||||
|
|
||||||
// Make a final call to SendCurrentResults() and await its completion. If the previously scheduled task already took care of latest status send then this should be a no-op
|
|
||||||
//
|
|
||||||
await SendCurrentResults();
|
|
||||||
|
|
||||||
|
|
||||||
// and finally:
|
|
||||||
// Make a call to send ResultCompletion and await its completion. This is just for backward compatibility with older protocol
|
|
||||||
//
|
|
||||||
await (ResultCompletion?.Invoke(this) ?? Task.CompletedTask);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -518,7 +496,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Add exception handling to the save task
|
// Add exception handling to the save task
|
||||||
Task taskWithHandling = saveAsTask.ContinueWithOnFaulted(async t =>
|
Task taskWithHandling = saveAsTask.ContinueWithOnFaulted(async t =>
|
||||||
{
|
{
|
||||||
@@ -582,19 +560,8 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
#region Private Helper Methods
|
#region Public Helper Methods
|
||||||
/// <summary>
|
public async Task SendCurrentResults()
|
||||||
/// Sends the ResultsUpdated message if the number of rows has changed since last send.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="stateInfo"></param>
|
|
||||||
private void SendResultAvailableOrUpdated (object stateInfo = null)
|
|
||||||
{
|
|
||||||
// Make the call to send current results and synchronously wait for it to finish
|
|
||||||
//
|
|
||||||
SendCurrentResults().Wait();
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task SendCurrentResults()
|
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -603,7 +570,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
//
|
//
|
||||||
sendResultsSemphore.Wait();
|
sendResultsSemphore.Wait();
|
||||||
|
|
||||||
ResultSet currentResultSetSnapshot = (ResultSet) MemberwiseClone();
|
ResultSet currentResultSetSnapshot = (ResultSet)MemberwiseClone();
|
||||||
if (LastUpdatedSummary == null) // We need to send results available message.
|
if (LastUpdatedSummary == null) // We need to send results available message.
|
||||||
{
|
{
|
||||||
// Fire off results Available task and await it
|
// Fire off results Available task and await it
|
||||||
@@ -658,12 +625,30 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
// and finally:
|
||||||
|
// Make a call to send ResultCompletion and await its completion. This is just for backward compatibility with older protocol
|
||||||
|
//
|
||||||
|
await (ResultCompletion?.Invoke(this) ?? Task.CompletedTask);
|
||||||
|
|
||||||
// Release the sendResultsSemphore so the next invocation gets unblocked
|
// Release the sendResultsSemphore so the next invocation gets unblocked
|
||||||
//
|
//
|
||||||
sendResultsSemphore.Release();
|
sendResultsSemphore.Release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
#region Private Helper Methods
|
||||||
|
/// <summary>
|
||||||
|
/// Sends the ResultsUpdated message if the number of rows has changed since last send.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="stateInfo"></param>
|
||||||
|
private void SendResultAvailableOrUpdated(object stateInfo = null)
|
||||||
|
{
|
||||||
|
// Make the call to send current results and synchronously wait for it to finish
|
||||||
|
//
|
||||||
|
SendCurrentResults().Wait();
|
||||||
|
}
|
||||||
|
|
||||||
private uint ResultsIntervalMultiplier { get; set; } = 1;
|
private uint ResultsIntervalMultiplier { get; set; } = 1;
|
||||||
|
|
||||||
@@ -696,8 +681,8 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Determine the special action, if any, for this result set
|
/// Determine the special action, if any, for this result set
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private SpecialAction ProcessSpecialAction()
|
private SpecialAction ProcessSpecialAction()
|
||||||
{
|
{
|
||||||
|
|
||||||
// Check if this result set is a showplan
|
// Check if this result set is a showplan
|
||||||
if (Columns.Length == 1 && string.Compare(Columns[0].ColumnName, YukonXmlShowPlanColumn, StringComparison.OrdinalIgnoreCase) == 0)
|
if (Columns.Length == 1 && string.Compare(Columns[0].ColumnName, YukonXmlShowPlanColumn, StringComparison.OrdinalIgnoreCase) == 0)
|
||||||
@@ -732,7 +717,7 @@ namespace Microsoft.Kusto.ServiceLayer.QueryExecution
|
|||||||
{
|
{
|
||||||
throw new InvalidOperationException(SR.QueryServiceResultSetAddNoRows);
|
throw new InvalidOperationException(SR.QueryServiceResultSetAddNoRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
using (IFileStreamWriter writer = fileStreamFactory.GetWriter(outputFileName))
|
using (IFileStreamWriter writer = fileStreamFactory.GetWriter(outputFileName))
|
||||||
{
|
{
|
||||||
// Write the row to the end of the file
|
// Write the row to the end of the file
|
||||||
|
|||||||
Reference in New Issue
Block a user