mirror of
https://github.com/ckaczor/sqltoolsservice.git
synced 2026-01-16 17:23:38 -05:00
Make query execution truly asynchronous (#83)
The two main changes in this pull request: Launching query execution as an asynchronous task that performs a callback upon completion or failure of a query. (Which also sets us up for callbacks progressive results) Moving away from using the Result of a query execution to return an error. Instead we'll use an error event to return an error Additionally, some nice refactoring and cleaning up of the unit tests to take advantage of the cool RequestContext mock tooling by @kevcunnane * Initial commit of refactor to run execution truely asynchronously * Moving the storage of the task into Query class Callbacks for completion of a query and failure of a query are setup as events in the Query class. This actually sets us up for a very nice framework for adding batch and resultset completion callbacks. However, this also exposes a problem with cancelling queries and returning errors -- we don't properly handle errors during execution of a query (aside from DB errors). * Wrapping things up in order to submit for code review * Adding fixes as per comments
This commit is contained in:
@@ -31,7 +31,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
private bool disposed;
|
||||
|
||||
/// <summary>
|
||||
/// Factory for creating readers/writrs for the output of the batch
|
||||
/// Factory for creating readers/writers for the output of the batch
|
||||
/// </summary>
|
||||
private readonly IFileStreamFactory outputFileFactory;
|
||||
|
||||
@@ -151,7 +151,8 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
}
|
||||
|
||||
// Make sure we aren't using a ReliableCommad since we do not want automatic retry
|
||||
Debug.Assert(!(command is ReliableSqlConnection.ReliableSqlCommand), "ReliableSqlCommand command should not be used to execute queries");
|
||||
Debug.Assert(!(command is ReliableSqlConnection.ReliableSqlCommand),
|
||||
"ReliableSqlCommand command should not be used to execute queries");
|
||||
|
||||
// Create a command that we'll use for executing the query
|
||||
using (command)
|
||||
@@ -170,18 +171,19 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
{
|
||||
// Create a message with the number of affected rows -- IF the query affects rows
|
||||
resultMessages.Add(new ResultMessage(reader.RecordsAffected >= 0
|
||||
? SR.QueryServiceAffectedRows(reader.RecordsAffected)
|
||||
: SR.QueryServiceCompletedSuccessfully));
|
||||
? SR.QueryServiceAffectedRows(reader.RecordsAffected)
|
||||
: SR.QueryServiceCompletedSuccessfully));
|
||||
continue;
|
||||
}
|
||||
|
||||
// This resultset has results (ie, SELECT/etc queries)
|
||||
// Read until we hit the end of the result set
|
||||
ResultSet resultSet = new ResultSet(reader, outputFileFactory);
|
||||
await resultSet.ReadResultToEnd(cancellationToken);
|
||||
|
||||
|
||||
// Add the result set to the results of the query
|
||||
resultSets.Add(resultSet);
|
||||
|
||||
// Read until we hit the end of the result set
|
||||
await resultSet.ReadResultToEnd(cancellationToken);
|
||||
|
||||
// Add a message for the number of rows the query returned
|
||||
resultMessages.Add(new ResultMessage(SR.QueryServiceAffectedRows(resultSet.RowCount)));
|
||||
@@ -194,9 +196,15 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
HasError = true;
|
||||
UnwrapDbException(dbe);
|
||||
}
|
||||
catch (Exception)
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
resultMessages.Add(new ResultMessage(SR.QueryServiceQueryCancelled));
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
HasError = true;
|
||||
resultMessages.Add(new ResultMessage(SR.QueryServiceQueryFailed(e.Message)));
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
|
||||
@@ -96,6 +96,22 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
|
||||
#region Properties
|
||||
|
||||
/// <summary>
|
||||
/// Delegate type for callback when a query completes or fails
|
||||
/// </summary>
|
||||
/// <param name="q">The query that completed</param>
|
||||
public delegate Task QueryAsyncEventHandler(Query q);
|
||||
|
||||
/// <summary>
|
||||
/// Callback for when the query has completed successfully
|
||||
/// </summary>
|
||||
public event QueryAsyncEventHandler QueryCompleted;
|
||||
|
||||
/// <summary>
|
||||
/// Callback for when the query has failed
|
||||
/// </summary>
|
||||
public event QueryAsyncEventHandler QueryFailed;
|
||||
|
||||
/// <summary>
|
||||
/// The batches underneath this query
|
||||
/// </summary>
|
||||
@@ -124,6 +140,8 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
}
|
||||
}
|
||||
|
||||
internal Task ExecutionTask { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Whether or not the query has completed executed, regardless of success or failure
|
||||
/// </summary>
|
||||
@@ -167,10 +185,44 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
cancellationSource.Cancel();
|
||||
}
|
||||
|
||||
public void Execute()
|
||||
{
|
||||
ExecutionTask = Task.Run(ExecuteInternal);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Retrieves a subset of the result sets
|
||||
/// </summary>
|
||||
/// <param name="batchIndex">The index for selecting the batch item</param>
|
||||
/// <param name="resultSetIndex">The index for selecting the result set</param>
|
||||
/// <param name="startRow">The starting row of the results</param>
|
||||
/// <param name="rowCount">How many rows to retrieve</param>
|
||||
/// <returns>A subset of results</returns>
|
||||
public Task<ResultSetSubset> GetSubset(int batchIndex, int resultSetIndex, int startRow, int rowCount)
|
||||
{
|
||||
// Sanity check that the results are available
|
||||
if (!HasExecuted)
|
||||
{
|
||||
throw new InvalidOperationException(SR.QueryServiceSubsetNotCompleted);
|
||||
}
|
||||
|
||||
// Sanity check to make sure that the batch is within bounds
|
||||
if (batchIndex < 0 || batchIndex >= Batches.Length)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(batchIndex), SR.QueryServiceSubsetBatchOutOfRange);
|
||||
}
|
||||
|
||||
return Batches[batchIndex].GetSubset(resultSetIndex, startRow, rowCount);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Private Helpers
|
||||
|
||||
/// <summary>
|
||||
/// Executes this query asynchronously and collects all result sets
|
||||
/// </summary>
|
||||
public async Task Execute()
|
||||
private async Task ExecuteInternal()
|
||||
{
|
||||
// Mark that we've internally executed
|
||||
hasExecuteBeenCalled = true;
|
||||
@@ -202,6 +254,20 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
{
|
||||
await b.Execute(conn, cancellationSource.Token);
|
||||
}
|
||||
|
||||
// Call the query execution callback
|
||||
if (QueryCompleted != null)
|
||||
{
|
||||
await QueryCompleted(this);
|
||||
}
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// Call the query failure callback
|
||||
if (QueryFailed != null)
|
||||
{
|
||||
await QueryFailed(this);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
@@ -227,7 +293,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
throw new InvalidOperationException(SR.QueryServiceMessageSenderNotSql);
|
||||
}
|
||||
|
||||
foreach(SqlError error in args.Errors)
|
||||
foreach (SqlError error in args.Errors)
|
||||
{
|
||||
// Did the database context change (error code 5701)?
|
||||
if (error.Number == DatabaseContextChangeErrorNumber)
|
||||
@@ -237,31 +303,6 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Retrieves a subset of the result sets
|
||||
/// </summary>
|
||||
/// <param name="batchIndex">The index for selecting the batch item</param>
|
||||
/// <param name="resultSetIndex">The index for selecting the result set</param>
|
||||
/// <param name="startRow">The starting row of the results</param>
|
||||
/// <param name="rowCount">How many rows to retrieve</param>
|
||||
/// <returns>A subset of results</returns>
|
||||
public Task<ResultSetSubset> GetSubset(int batchIndex, int resultSetIndex, int startRow, int rowCount)
|
||||
{
|
||||
// Sanity check that the results are available
|
||||
if (!HasExecuted)
|
||||
{
|
||||
throw new InvalidOperationException(SR.QueryServiceSubsetNotCompleted);
|
||||
}
|
||||
|
||||
// Sanity check to make sure that the batch is within bounds
|
||||
if (batchIndex < 0 || batchIndex >= Batches.Length)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(batchIndex), SR.QueryServiceSubsetBatchOutOfRange);
|
||||
}
|
||||
|
||||
return Batches[batchIndex].GetSubset(resultSetIndex, startRow, rowCount);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region IDisposable Implementation
|
||||
|
||||
@@ -129,19 +129,11 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
public async Task HandleExecuteRequest(QueryExecuteParams executeParams,
|
||||
RequestContext<QueryExecuteResult> requestContext)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Get a query new active query
|
||||
Query newQuery = await CreateAndActivateNewQuery(executeParams, requestContext);
|
||||
// Get a query new active query
|
||||
Query newQuery = await CreateAndActivateNewQuery(executeParams, requestContext);
|
||||
|
||||
// Execute the query
|
||||
await ExecuteAndCompleteQuery(executeParams, requestContext, newQuery);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// Dump any unexpected exceptions as errors
|
||||
await requestContext.SendError(e.Message);
|
||||
}
|
||||
// Execute the query -- asynchronously
|
||||
await ExecuteAndCompleteQuery(executeParams, requestContext, newQuery);
|
||||
}
|
||||
|
||||
public async Task HandleResultSubsetRequest(QueryExecuteSubsetParams subsetParams,
|
||||
@@ -399,7 +391,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
{
|
||||
//get column name
|
||||
DbColumnWrapper col = selectedResultSet.Columns[i];
|
||||
string val = row[i]?.ToString();
|
||||
string val = row[i];
|
||||
jsonWriter.WritePropertyName(col.ColumnName);
|
||||
if (val == null)
|
||||
{
|
||||
@@ -440,10 +432,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
ConnectionInfo connectionInfo;
|
||||
if (!ConnectionService.TryFindConnection(executeParams.OwnerUri, out connectionInfo))
|
||||
{
|
||||
await requestContext.SendResult(new QueryExecuteResult
|
||||
{
|
||||
Messages = SR.QueryServiceQueryInvalidOwnerUri
|
||||
});
|
||||
await requestContext.SendError(SR.QueryServiceQueryInvalidOwnerUri);
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -488,24 +477,22 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
Query newQuery = new Query(queryText, connectionInfo, settings, BufferFileFactory);
|
||||
if (!ActiveQueries.TryAdd(executeParams.OwnerUri, newQuery))
|
||||
{
|
||||
await requestContext.SendResult(new QueryExecuteResult
|
||||
{
|
||||
Messages = SR.QueryServiceQueryInProgress
|
||||
});
|
||||
await requestContext.SendError(SR.QueryServiceQueryInProgress);
|
||||
newQuery.Dispose();
|
||||
return null;
|
||||
}
|
||||
|
||||
return newQuery;
|
||||
}
|
||||
catch (ArgumentException ane)
|
||||
catch (Exception e)
|
||||
{
|
||||
await requestContext.SendResult(new QueryExecuteResult { Messages = ane.Message });
|
||||
await requestContext.SendError(e.Message);
|
||||
return null;
|
||||
}
|
||||
// Any other exceptions will fall through here and be collected at the end
|
||||
}
|
||||
|
||||
private async Task ExecuteAndCompleteQuery(QueryExecuteParams executeParams, RequestContext<QueryExecuteResult> requestContext, Query query)
|
||||
private static async Task ExecuteAndCompleteQuery(QueryExecuteParams executeParams, RequestContext<QueryExecuteResult> requestContext, Query query)
|
||||
{
|
||||
// Skip processing if the query is null
|
||||
if (query == null)
|
||||
@@ -513,21 +500,29 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
|
||||
return;
|
||||
}
|
||||
|
||||
// Launch the query and respond with successfully launching it
|
||||
Task executeTask = query.Execute();
|
||||
// Setup the query completion/failure callbacks
|
||||
Query.QueryAsyncEventHandler callback = async q =>
|
||||
{
|
||||
// Send back the results
|
||||
QueryExecuteCompleteParams eventParams = new QueryExecuteCompleteParams
|
||||
{
|
||||
OwnerUri = executeParams.OwnerUri,
|
||||
BatchSummaries = q.BatchSummaries
|
||||
};
|
||||
await requestContext.SendEvent(QueryExecuteCompleteEvent.Type, eventParams);
|
||||
};
|
||||
|
||||
query.QueryCompleted += callback;
|
||||
query.QueryFailed += callback;
|
||||
|
||||
// Launch this as an asynchronous task
|
||||
query.Execute();
|
||||
|
||||
// Send back a result showing we were successful
|
||||
await requestContext.SendResult(new QueryExecuteResult
|
||||
{
|
||||
Messages = null
|
||||
});
|
||||
|
||||
// Wait for query execution and then send back the results
|
||||
await Task.WhenAll(executeTask);
|
||||
QueryExecuteCompleteParams eventParams = new QueryExecuteCompleteParams
|
||||
{
|
||||
OwnerUri = executeParams.OwnerUri,
|
||||
BatchSummaries = query.BatchSummaries
|
||||
};
|
||||
await requestContext.SendEvent(QueryExecuteCompleteEvent.Type, eventParams);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
Reference in New Issue
Block a user