mirror of
https://github.com/ckaczor/sqltoolsservice.git
synced 2026-01-26 17:24:21 -05:00
Progressive Results Part 2: Result Completion Event (#134)
The main change in this pull request is to add a new event that will be fired upon completion of a resultset but before the completion of a batch. This event will only fire if a resultset is available and generated. Changes: * ConnectionService - Slight changes to enable mocking, cleanup * Batch - Moving summary generation into ResultSet class, adding generation of ordinals for resultset and locking of result set list (which needs further refinement, but would be outside scope of this change) * Adding new event and associated parameters for completion of a resultset. Params return the resultset summary * Adding logic for assigning the event a handler in the query execution service * Adding unit tests for testing the new event /making sure the existing tests work * Refactoring some private properties into member variables * Refactor to remove SectionData class in favor of BufferRange * Adding callback for batch completion that will let the extension know that a batch has completed execution * Refactoring to make progressive results work as per async query execution * Allowing retrieval of batch results while query is in progress * reverting global.json, whoops * Adding a few missing comments, and fixing a couple code style bugs * Using SelectionData everywhere again * One more missing comment * Adding new notification type for result set completion * Plumbing event for result set completion * Unit tests for result set events This includes a fairly substantial change to create a mock of the ConnectionService and to create separate memorystream storage arrays. It preserves more correct behavior with a integration test, fixes an issue where the test db reader will return n-1 rows because the Reliable Connection Helper steals a record. * Adding locking to ResultSets for thread safety * Adding/fixing unit tests * Adding batch ID to result set summary
This commit is contained in:
@@ -29,7 +29,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
/// <summary>
|
||||
/// Singleton service instance
|
||||
/// </summary>
|
||||
private static Lazy<ConnectionService> instance
|
||||
private static readonly Lazy<ConnectionService> instance
|
||||
= new Lazy<ConnectionService>(() => new ConnectionService());
|
||||
|
||||
/// <summary>
|
||||
@@ -48,11 +48,11 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
/// </summary>
|
||||
private ISqlConnectionFactory connectionFactory;
|
||||
|
||||
private Dictionary<string, ConnectionInfo> ownerToConnectionMap = new Dictionary<string, ConnectionInfo>();
|
||||
private readonly Dictionary<string, ConnectionInfo> ownerToConnectionMap = new Dictionary<string, ConnectionInfo>();
|
||||
|
||||
private ConcurrentDictionary<string, CancellationTokenSource> ownerToCancellationTokenSourceMap = new ConcurrentDictionary<string, CancellationTokenSource>();
|
||||
private readonly ConcurrentDictionary<string, CancellationTokenSource> ownerToCancellationTokenSourceMap = new ConcurrentDictionary<string, CancellationTokenSource>();
|
||||
|
||||
private Object cancellationTokenSourceLock = new Object();
|
||||
private readonly object cancellationTokenSourceLock = new object();
|
||||
|
||||
/// <summary>
|
||||
/// Map from script URIs to ConnectionInfo objects
|
||||
@@ -173,13 +173,12 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
connectionInfo = new ConnectionInfo(ConnectionFactory, connectionParams.OwnerUri, connectionParams.Connection);
|
||||
|
||||
// try to connect
|
||||
var response = new ConnectionCompleteParams();
|
||||
response.OwnerUri = connectionParams.OwnerUri;
|
||||
var response = new ConnectionCompleteParams {OwnerUri = connectionParams.OwnerUri};
|
||||
CancellationTokenSource source = null;
|
||||
try
|
||||
{
|
||||
// build the connection string from the input parameters
|
||||
string connectionString = ConnectionService.BuildConnectionString(connectionInfo.ConnectionDetails);
|
||||
string connectionString = BuildConnectionString(connectionInfo.ConnectionDetails);
|
||||
|
||||
// create a sql connection instance
|
||||
connectionInfo.SqlConnection = connectionInfo.Factory.CreateSqlConnection(connectionString);
|
||||
@@ -261,7 +260,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
// Update with the actual database name in connectionInfo and result
|
||||
// Doing this here as we know the connection is open - expect to do this only on connecting
|
||||
connectionInfo.ConnectionDetails.DatabaseName = connectionInfo.SqlConnection.Database;
|
||||
response.ConnectionSummary = new ConnectionSummary()
|
||||
response.ConnectionSummary = new ConnectionSummary
|
||||
{
|
||||
ServerName = connectionInfo.ConnectionDetails.ServerName,
|
||||
DatabaseName = connectionInfo.ConnectionDetails.DatabaseName,
|
||||
@@ -269,7 +268,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
};
|
||||
|
||||
// invoke callback notifications
|
||||
invokeOnConnectionActivities(connectionInfo);
|
||||
InvokeOnConnectionActivities(connectionInfo);
|
||||
|
||||
// try to get information about the connected SQL Server instance
|
||||
try
|
||||
@@ -278,7 +277,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
DbConnection connection = reliableConnection != null ? reliableConnection.GetUnderlyingConnection() : connectionInfo.SqlConnection;
|
||||
|
||||
ReliableConnectionHelper.ServerInfo serverInfo = ReliableConnectionHelper.GetServerVersion(connection);
|
||||
response.ServerInfo = new Contracts.ServerInfo()
|
||||
response.ServerInfo = new ServerInfo
|
||||
{
|
||||
ServerMajorVersion = serverInfo.ServerMajorVersion,
|
||||
ServerMinorVersion = serverInfo.ServerMinorVersion,
|
||||
@@ -399,7 +398,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
connection.Open();
|
||||
|
||||
List<string> results = new List<string>();
|
||||
var systemDatabases = new string[] {"master", "model", "msdb", "tempdb"};
|
||||
var systemDatabases = new[] {"master", "model", "msdb", "tempdb"};
|
||||
using (DbCommand command = connection.CreateCommand())
|
||||
{
|
||||
command.CommandText = "SELECT name FROM sys.databases ORDER BY name ASC";
|
||||
@@ -473,7 +472,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
|
||||
try
|
||||
{
|
||||
RunConnectRequestHandlerTask(connectParams, requestContext);
|
||||
RunConnectRequestHandlerTask(connectParams);
|
||||
await requestContext.SendResult(true);
|
||||
}
|
||||
catch
|
||||
@@ -482,7 +481,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
}
|
||||
}
|
||||
|
||||
private void RunConnectRequestHandlerTask(ConnectParams connectParams, RequestContext<bool> requestContext)
|
||||
private void RunConnectRequestHandlerTask(ConnectParams connectParams)
|
||||
{
|
||||
// create a task to connect asynchronously so that other requests are not blocked in the meantime
|
||||
Task.Run(async () =>
|
||||
@@ -490,7 +489,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
try
|
||||
{
|
||||
// open connection based on request details
|
||||
ConnectionCompleteParams result = await ConnectionService.Instance.Connect(connectParams);
|
||||
ConnectionCompleteParams result = await Instance.Connect(connectParams);
|
||||
await ServiceHost.SendEvent(ConnectionCompleteNotification.Type, result);
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -515,7 +514,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
|
||||
try
|
||||
{
|
||||
bool result = ConnectionService.Instance.CancelConnect(cancelParams);
|
||||
bool result = Instance.CancelConnect(cancelParams);
|
||||
await requestContext.SendResult(result);
|
||||
}
|
||||
catch(Exception ex)
|
||||
@@ -535,7 +534,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
|
||||
try
|
||||
{
|
||||
bool result = ConnectionService.Instance.Disconnect(disconnectParams);
|
||||
bool result = Instance.Disconnect(disconnectParams);
|
||||
await requestContext.SendResult(result);
|
||||
}
|
||||
catch(Exception ex)
|
||||
@@ -556,7 +555,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
|
||||
try
|
||||
{
|
||||
ListDatabasesResponse result = ConnectionService.Instance.ListDatabases(listDatabasesParams);
|
||||
ListDatabasesResponse result = Instance.ListDatabases(listDatabasesParams);
|
||||
await requestContext.SendResult(result);
|
||||
}
|
||||
catch(Exception ex)
|
||||
@@ -579,10 +578,12 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
/// <param name="connectionDetails"></param>
|
||||
public static string BuildConnectionString(ConnectionDetails connectionDetails)
|
||||
{
|
||||
SqlConnectionStringBuilder connectionBuilder = new SqlConnectionStringBuilder();
|
||||
connectionBuilder["Data Source"] = connectionDetails.ServerName;
|
||||
connectionBuilder["User Id"] = connectionDetails.UserName;
|
||||
connectionBuilder["Password"] = connectionDetails.Password;
|
||||
SqlConnectionStringBuilder connectionBuilder = new SqlConnectionStringBuilder
|
||||
{
|
||||
["Data Source"] = connectionDetails.ServerName,
|
||||
["User Id"] = connectionDetails.UserName,
|
||||
["Password"] = connectionDetails.Password
|
||||
};
|
||||
|
||||
// Check for any optional parameters
|
||||
if (!string.IsNullOrEmpty(connectionDetails.DatabaseName))
|
||||
@@ -722,7 +723,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
|
||||
// Fire a connection changed event
|
||||
ConnectionChangedParams parameters = new ConnectionChangedParams();
|
||||
ConnectionSummary summary = (ConnectionSummary)(info.ConnectionDetails);
|
||||
ConnectionSummary summary = info.ConnectionDetails;
|
||||
parameters.Connection = summary.Clone();
|
||||
parameters.OwnerUri = ownerUri;
|
||||
ServiceHost.SendEvent(ConnectionChangedNotification.Type, parameters);
|
||||
@@ -741,7 +742,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
|
||||
}
|
||||
}
|
||||
|
||||
private void invokeOnConnectionActivities(ConnectionInfo connectionInfo)
|
||||
private void InvokeOnConnectionActivities(ConnectionInfo connectionInfo)
|
||||
{
|
||||
foreach (var activity in this.onConnectionActivities)
|
||||
{
|
||||
|
||||
@@ -88,6 +88,12 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
/// </summary>
|
||||
public event BatchAsyncEventHandler BatchCompletion;
|
||||
|
||||
/// <summary>
|
||||
/// Event that will be called when the resultset has completed execution. It will not be
|
||||
/// called from the Batch but from the ResultSet instance
|
||||
/// </summary>
|
||||
public event ResultSet.ResultSetAsyncEventHandler ResultSetCompletion;
|
||||
|
||||
/// <summary>
|
||||
/// The text of batch that will be executed
|
||||
/// </summary>
|
||||
@@ -155,12 +161,10 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
{
|
||||
get
|
||||
{
|
||||
return ResultSets.Select((set, index) => new ResultSetSummary()
|
||||
lock (resultSets)
|
||||
{
|
||||
ColumnInfo = set.Columns,
|
||||
Id = index,
|
||||
RowCount = set.RowCount
|
||||
}).ToArray();
|
||||
return resultSets.Select(set => set.Summary).ToArray();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,7 +225,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
command = sqlConn.GetUnderlyingConnection().CreateCommand();
|
||||
|
||||
// Add a handler for when the command completes
|
||||
SqlCommand sqlCommand = (SqlCommand) command;
|
||||
SqlCommand sqlCommand = (SqlCommand)command;
|
||||
sqlCommand.StatementCompleted += StatementCompletedHandler;
|
||||
}
|
||||
else
|
||||
@@ -244,6 +248,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
// Execute the command to get back a reader
|
||||
using (DbDataReader reader = await command.ExecuteReaderAsync(cancellationToken))
|
||||
{
|
||||
int resultSetOrdinal = 0;
|
||||
do
|
||||
{
|
||||
// Skip this result set if there aren't any rows (ie, UPDATE/DELETE/etc queries)
|
||||
@@ -253,11 +258,16 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
}
|
||||
|
||||
// This resultset has results (ie, SELECT/etc queries)
|
||||
ResultSet resultSet = new ResultSet(reader, outputFileFactory);
|
||||
|
||||
ResultSet resultSet = new ResultSet(reader, resultSetOrdinal, Id, outputFileFactory);
|
||||
resultSet.ResultCompletion += ResultSetCompletion;
|
||||
|
||||
// Add the result set to the results of the query
|
||||
resultSets.Add(resultSet);
|
||||
|
||||
lock (resultSets)
|
||||
{
|
||||
resultSets.Add(resultSet);
|
||||
resultSetOrdinal++;
|
||||
}
|
||||
|
||||
// Read until we hit the end of the result set
|
||||
await resultSet.ReadResultToEnd(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
@@ -318,20 +328,21 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
/// <returns>A subset of results</returns>
|
||||
public Task<ResultSetSubset> GetSubset(int resultSetIndex, int startRow, int rowCount)
|
||||
{
|
||||
// Sanity check to make sure that the batch has finished
|
||||
if (!HasExecuted)
|
||||
ResultSet targetResultSet;
|
||||
lock (resultSets)
|
||||
{
|
||||
throw new InvalidOperationException(SR.QueryServiceSubsetBatchNotCompleted);
|
||||
}
|
||||
// Sanity check to make sure we have valid numbers
|
||||
if (resultSetIndex < 0 || resultSetIndex >= resultSets.Count)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(resultSetIndex),
|
||||
SR.QueryServiceSubsetResultSetOutOfRange);
|
||||
}
|
||||
|
||||
// Sanity check to make sure we have valid numbers
|
||||
if (resultSetIndex < 0 || resultSetIndex >= resultSets.Count)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(resultSetIndex), SR.QueryServiceSubsetResultSetOutOfRange);
|
||||
targetResultSet = resultSets[resultSetIndex];
|
||||
}
|
||||
|
||||
// Retrieve the result set
|
||||
return resultSets[resultSetIndex].GetSubset(startRow, rowCount);
|
||||
return targetResultSet.GetSubset(startRow, rowCount);
|
||||
}
|
||||
|
||||
#endregion
|
||||
@@ -431,9 +442,12 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
|
||||
if (disposing)
|
||||
{
|
||||
foreach (ResultSet r in ResultSets)
|
||||
lock (resultSets)
|
||||
{
|
||||
r.Dispose();
|
||||
foreach (ResultSet r in resultSets)
|
||||
{
|
||||
r.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
//
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
|
||||
using Microsoft.SqlTools.ServiceLayer.Hosting.Protocol.Contracts;
|
||||
|
||||
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
|
||||
{
|
||||
public class QueryExecuteResultSetCompleteParams
|
||||
{
|
||||
public ResultSetSummary ResultSetSummary { get; set; }
|
||||
|
||||
public string OwnerUri { get; set; }
|
||||
}
|
||||
|
||||
public class QueryExecuteResultSetCompleteEvent
|
||||
{
|
||||
public static readonly
|
||||
EventType<QueryExecuteResultSetCompleteParams> Type =
|
||||
EventType<QueryExecuteResultSetCompleteParams>.Create("query/resultSetComplete");
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,11 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
|
||||
/// </summary>
|
||||
public int Id { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The ID of the batch set within the query
|
||||
/// </summary>
|
||||
public int BatchId { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The number of rows that was returned with the resultset
|
||||
/// </summary>
|
||||
|
||||
@@ -92,25 +92,13 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
Batches = batchSelection.ToArray();
|
||||
}
|
||||
|
||||
#region Properties
|
||||
|
||||
/// <summary>
|
||||
/// Delegate type for callback when a query completes or fails
|
||||
/// </summary>
|
||||
/// <param name="q">The query that completed</param>
|
||||
public delegate Task QueryAsyncEventHandler(Query q);
|
||||
#region Events
|
||||
|
||||
/// <summary>
|
||||
/// Event to be called when a batch is completed.
|
||||
/// </summary>
|
||||
public event Batch.BatchAsyncEventHandler BatchCompleted;
|
||||
|
||||
/// <summary>
|
||||
/// Delegate type for callback when a query connection fails
|
||||
/// </summary>
|
||||
/// <param name="message">Message to return</param>
|
||||
public delegate Task QueryAsyncErrorEventHandler(string message);
|
||||
|
||||
/// <summary>
|
||||
/// Callback for when the query has completed successfully
|
||||
/// </summary>
|
||||
@@ -126,6 +114,27 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
/// </summary>
|
||||
public event QueryAsyncErrorEventHandler QueryConnectionException;
|
||||
|
||||
/// <summary>
|
||||
/// Event to be called when a resultset has completed.
|
||||
/// </summary>
|
||||
public event ResultSet.ResultSetAsyncEventHandler ResultSetCompleted;
|
||||
|
||||
#endregion
|
||||
|
||||
#region Properties
|
||||
|
||||
/// <summary>
|
||||
/// Delegate type for callback when a query completes or fails
|
||||
/// </summary>
|
||||
/// <param name="q">The query that completed</param>
|
||||
public delegate Task QueryAsyncEventHandler(Query q);
|
||||
|
||||
/// <summary>
|
||||
/// Delegate type for callback when a query connection fails
|
||||
/// </summary>
|
||||
/// <param name="message">Message to return</param>
|
||||
public delegate Task QueryAsyncErrorEventHandler(string message);
|
||||
|
||||
/// <summary>
|
||||
/// The batches underneath this query
|
||||
/// </summary>
|
||||
@@ -146,6 +155,10 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Storage for the async task for execution. Set as internal in order to await completion
|
||||
/// in unit tests.
|
||||
/// </summary>
|
||||
internal Task ExecutionTask { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
@@ -242,11 +255,11 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
{
|
||||
await conn.OpenAsync();
|
||||
}
|
||||
catch(Exception exception)
|
||||
catch (Exception exception)
|
||||
{
|
||||
this.HasExecuted = true;
|
||||
this.HasExecuted = true;
|
||||
if (QueryConnectionException != null)
|
||||
{
|
||||
{
|
||||
await QueryConnectionException(exception.Message);
|
||||
}
|
||||
return;
|
||||
@@ -265,6 +278,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
foreach (Batch b in Batches)
|
||||
{
|
||||
b.BatchCompletion += BatchCompleted;
|
||||
b.ResultSetCompletion += ResultSetCompleted;
|
||||
await b.Execute(conn, cancellationSource.Token);
|
||||
}
|
||||
|
||||
|
||||
@@ -437,7 +437,6 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
};
|
||||
await requestContext.SendEvent(QueryExecuteCompleteEvent.Type, eventParams);
|
||||
};
|
||||
|
||||
query.QueryCompleted += callback;
|
||||
query.QueryFailed += callback;
|
||||
query.QueryConnectionException += errorCallback;
|
||||
@@ -454,6 +453,18 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
};
|
||||
query.BatchCompleted += batchCallback;
|
||||
|
||||
// Setup the ResultSet completion callback
|
||||
ResultSet.ResultSetAsyncEventHandler resultCallback = async r =>
|
||||
{
|
||||
QueryExecuteResultSetCompleteParams eventParams = new QueryExecuteResultSetCompleteParams
|
||||
{
|
||||
ResultSetSummary = r.Summary,
|
||||
OwnerUri = executeParams.OwnerUri
|
||||
};
|
||||
await requestContext.SendEvent(QueryExecuteResultSetCompleteEvent.Type, eventParams);
|
||||
};
|
||||
query.ResultSetCompleted += resultCallback;
|
||||
|
||||
// Launch this as an asynchronous task
|
||||
query.Execute();
|
||||
|
||||
|
||||
@@ -59,9 +59,10 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
private readonly IFileStreamFactory fileStreamFactory;
|
||||
|
||||
/// <summary>
|
||||
/// Whether or not the result set has been read in from the database
|
||||
/// Whether or not the result set has been read in from the database,
|
||||
/// set as internal in order to fake value in unit tests
|
||||
/// </summary>
|
||||
private bool hasBeenRead;
|
||||
internal bool hasBeenRead;
|
||||
|
||||
/// <summary>
|
||||
/// Whether resultSet is a 'for xml' or 'for json' result
|
||||
@@ -74,7 +75,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
private readonly string outputFileName;
|
||||
|
||||
/// <summary>
|
||||
/// Whether the resultSet is in the process of being disposed
|
||||
/// All save tasks currently saving this ResultSet
|
||||
/// </summary>
|
||||
private readonly ConcurrentDictionary<string, Task> saveTasks;
|
||||
|
||||
@@ -84,13 +85,17 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
/// Creates a new result set and initializes its state
|
||||
/// </summary>
|
||||
/// <param name="reader">The reader from executing a query</param>
|
||||
/// <param name="ordinal">The ID of the resultset, the ordinal of the result within the batch</param>
|
||||
/// <param name="batchOrdinal">The ID of the batch, the ordinal of the batch within the query</param>
|
||||
/// <param name="factory">Factory for creating a reader/writer</param>
|
||||
public ResultSet(DbDataReader reader, IFileStreamFactory factory)
|
||||
public ResultSet(DbDataReader reader, int ordinal, int batchOrdinal, IFileStreamFactory factory)
|
||||
{
|
||||
// Sanity check to make sure we got a reader
|
||||
Validate.IsNotNull(nameof(reader), SR.QueryServiceResultSetReaderNull);
|
||||
|
||||
dataReader = new StorageDataReader(reader);
|
||||
Id = ordinal;
|
||||
BatchId = batchOrdinal;
|
||||
|
||||
// Initialize the storage
|
||||
outputFileName = factory.CreateFile();
|
||||
@@ -104,6 +109,17 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
|
||||
#region Properties
|
||||
|
||||
/// <summary>
|
||||
/// Asynchronous handler for when a resultset has completed
|
||||
/// </summary>
|
||||
/// <param name="resultSet">The result set that completed</param>
|
||||
public delegate Task ResultSetAsyncEventHandler(ResultSet resultSet);
|
||||
|
||||
/// <summary>
|
||||
/// Event that will be called when the result set has completed execution
|
||||
/// </summary>
|
||||
public event ResultSetAsyncEventHandler ResultCompletion;
|
||||
|
||||
/// <summary>
|
||||
/// Whether the resultSet is in the process of being disposed
|
||||
/// </summary>
|
||||
@@ -115,6 +131,16 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
/// </summary>
|
||||
public DbColumnWrapper[] Columns { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// ID of the result set, relative to the batch
|
||||
/// </summary>
|
||||
public int Id { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// ID of the batch set, relative to the query
|
||||
/// </summary>
|
||||
public int BatchId { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Maximum number of characters to store for a field
|
||||
/// </summary>
|
||||
@@ -130,6 +156,23 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
/// </summary>
|
||||
public long RowCount { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Generates a summary of this result set
|
||||
/// </summary>
|
||||
public ResultSetSummary Summary
|
||||
{
|
||||
get
|
||||
{
|
||||
return new ResultSetSummary
|
||||
{
|
||||
ColumnInfo = Columns,
|
||||
Id = Id,
|
||||
BatchId = BatchId,
|
||||
RowCount = RowCount
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Public Methods
|
||||
@@ -170,9 +213,9 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
if (isSingleColumnXmlJsonResultSet)
|
||||
{
|
||||
// Iterate over all the rows and process them into a list of string builders
|
||||
// ReSharper disable once AccessToDisposedClosure The lambda is used immediately in string.Join call
|
||||
IEnumerable<string> rowValues = fileOffsets.Select(rowOffset => fileStreamReader.ReadRow(rowOffset, Columns)[0].DisplayValue);
|
||||
rows = new[] { new[] { string.Join(string.Empty, rowValues) } };
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -180,8 +223,9 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
IEnumerable<long> rowOffsets = fileOffsets.Skip(startRow).Take(rowCount);
|
||||
|
||||
// Iterate over the rows we need and process them into output
|
||||
rows = rowOffsets.Select(rowOffset =>
|
||||
fileStreamReader.ReadRow(rowOffset, Columns).Select(cell => cell.DisplayValue).ToArray())
|
||||
// ReSharper disable once AccessToDisposedClosure The lambda is used immediately in .ToArray call
|
||||
rows = rowOffsets.Select(rowOffset => fileStreamReader.ReadRow(rowOffset, Columns)
|
||||
.Select(cell => cell.DisplayValue).ToArray())
|
||||
.ToArray();
|
||||
|
||||
}
|
||||
@@ -201,33 +245,41 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
/// <param name="cancellationToken">Cancellation token for cancelling the query</param>
|
||||
public async Task ReadResultToEnd(CancellationToken cancellationToken)
|
||||
{
|
||||
// Mark that result has been read
|
||||
hasBeenRead = true;
|
||||
|
||||
// Open a writer for the file
|
||||
using (IFileStreamWriter fileWriter = fileStreamFactory.GetWriter(outputFileName, MaxCharsToStore, MaxXmlCharsToStore))
|
||||
try
|
||||
{
|
||||
// If we can initialize the columns using the column schema, use that
|
||||
if (!dataReader.DbDataReader.CanGetColumnSchema())
|
||||
// Mark that result has been read
|
||||
hasBeenRead = true;
|
||||
|
||||
// Open a writer for the file
|
||||
var fileWriter = fileStreamFactory.GetWriter(outputFileName, MaxCharsToStore, MaxCharsToStore);
|
||||
using (fileWriter)
|
||||
{
|
||||
throw new InvalidOperationException(SR.QueryServiceResultSetNoColumnSchema);
|
||||
// If we can initialize the columns using the column schema, use that
|
||||
if (!dataReader.DbDataReader.CanGetColumnSchema())
|
||||
{
|
||||
throw new InvalidOperationException(SR.QueryServiceResultSetNoColumnSchema);
|
||||
}
|
||||
Columns = dataReader.Columns;
|
||||
long currentFileOffset = 0;
|
||||
|
||||
while (await dataReader.ReadAsync(cancellationToken))
|
||||
{
|
||||
RowCount++;
|
||||
fileOffsets.Add(currentFileOffset);
|
||||
currentFileOffset += fileWriter.WriteRow(dataReader);
|
||||
}
|
||||
}
|
||||
Columns = dataReader.Columns;
|
||||
|
||||
long currentFileOffset = 0;
|
||||
while (await dataReader.ReadAsync(cancellationToken))
|
||||
// Check if resultset is 'for xml/json'. If it is, set isJson/isXml value in column metadata
|
||||
SingleColumnXmlJsonResultSet();
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Fire off a result set completion event if we have one
|
||||
if (ResultCompletion != null)
|
||||
{
|
||||
// Store the beginning of the row
|
||||
long rowStart = currentFileOffset;
|
||||
currentFileOffset += fileWriter.WriteRow(dataReader);
|
||||
|
||||
// Add the row to the list of rows we have only if the row was successfully written
|
||||
RowCount++;
|
||||
fileOffsets.Add(rowStart);
|
||||
await ResultCompletion(this);
|
||||
}
|
||||
}
|
||||
// Check if resultset is 'for xml/json'. If it is, set isJson/isXml value in column metadata
|
||||
SingleColumnXmlJsonResultSet();
|
||||
}
|
||||
|
||||
#endregion
|
||||
@@ -284,10 +336,11 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
/// If the result set represented by this class corresponds to a single JSON
|
||||
/// column that contains results of "for json" query, set isJson = true
|
||||
/// </summary>
|
||||
private void SingleColumnXmlJsonResultSet() {
|
||||
private void SingleColumnXmlJsonResultSet()
|
||||
{
|
||||
|
||||
if (Columns?.Length == 1 && RowCount != 0)
|
||||
{
|
||||
{
|
||||
if (Columns[0].ColumnName.Equals(NameOfForXMLColumn, StringComparison.Ordinal))
|
||||
{
|
||||
Columns[0].IsXml = true;
|
||||
@@ -299,7 +352,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
Columns[0].IsJson = true;
|
||||
isSingleColumnXmlJsonResultSet = true;
|
||||
RowCount = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user