Inter-Service API for executing queries (#223)

Adding new methods for executing queries from other services (such as the upcoming edit data service). The code is written to avoid duplicating logic by using lambdas to perform custom logic.
Additionally, the service host protocol has been slightly modified to split the IMessageSender into IEventSender and IRequestSender. This allows us to use either a ServiceHost or any RequestContext<T> to send events. It becomes very convenient to use another service's request context to send the events for query execution.

**Breaking Change**: This removes the messages property for query dispose results and instead elects to use error for any errors encountered during query disposal. A result is only used when something is successful.

* Splitting IMessageSender into IEventSender and IRequestSender

* Adding inter-service method for executing queries

* Adding inter-service method for disposing of a query

* Adding null checking for the success/error handlers
This commit is contained in:
Benjamin Russell
2017-02-02 17:05:10 -08:00
committed by GitHub
parent 8c6014b81c
commit 54c43f950f
9 changed files with 238 additions and 93 deletions

View File

@@ -87,17 +87,14 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// <summary>
/// The collection of active queries
/// </summary>
internal ConcurrentDictionary<string, Query> ActiveQueries
{
get { return queries.Value; }
}
internal ConcurrentDictionary<string, Query> ActiveQueries => queries.Value;
/// <summary>
/// Instance of the connection service, used to get the connection info for a given owner URI
/// </summary>
private ConnectionService ConnectionService { get; set; }
private ConnectionService ConnectionService { get; }
private WorkspaceService<SqlToolsSettings> WorkspaceService { get; set; }
private WorkspaceService<SqlToolsSettings> WorkspaceService { get; }
/// <summary>
/// Internal storage of active queries, lazily constructed as a threadsafe dictionary
@@ -105,7 +102,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
private readonly Lazy<ConcurrentDictionary<string, Query>> queries =
new Lazy<ConcurrentDictionary<string, Query>>(() => new ConcurrentDictionary<string, Query>());
private SqlToolsSettings Settings { get { return WorkspaceService<SqlToolsSettings>.Instance.CurrentSettings; } }
private SqlToolsSettings Settings => WorkspaceService<SqlToolsSettings>.Instance.CurrentSettings;
#endregion
@@ -146,20 +143,21 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// <summary>
/// Handles request to execute a selection of a document in the workspace service
/// </summary>
public async Task HandleExecuteRequest(ExecuteRequestParamsBase executeDocumentSelectionParams,
internal Task HandleExecuteRequest(ExecuteRequestParamsBase executeParams,
RequestContext<ExecuteRequestResult> requestContext)
{
// Get a query new active query
Query newQuery = await CreateAndActivateNewQuery(executeDocumentSelectionParams, requestContext);
// Setup actions to perform upon successful start and on failure to start
Func<Task> queryCreationAction = () => requestContext.SendResult(new ExecuteRequestResult());
Func<string, Task> queryFailAction = requestContext.SendError;
// Execute the query -- asynchronously
ExecuteAndCompleteQuery(executeDocumentSelectionParams, requestContext, newQuery);
// Use the internal handler to launch the query
return InterServiceExecuteQuery(executeParams, requestContext, queryCreationAction, queryFailAction);
}
/// <summary>
/// Handles a request to get a subset of the results of this query
/// </summary>
public async Task HandleResultSubsetRequest(SubsetParams subsetParams,
internal async Task HandleResultSubsetRequest(SubsetParams subsetParams,
RequestContext<SubsetResult> requestContext)
{
try
@@ -210,7 +208,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// <summary>
/// Handles a request to get an execution plan
/// </summary>
public async Task HandleExecutionPlanRequest(QueryExecutionPlanParams planParams,
internal async Task HandleExecutionPlanRequest(QueryExecutionPlanParams planParams,
RequestContext<QueryExecutionPlanResult> requestContext)
{
try
@@ -240,41 +238,21 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// <summary>
/// Handles a request to dispose of this query
/// </summary>
public async Task HandleDisposeRequest(QueryDisposeParams disposeParams,
internal async Task HandleDisposeRequest(QueryDisposeParams disposeParams,
RequestContext<QueryDisposeResult> requestContext)
{
try
{
// Attempt to remove the query for the owner uri
Query result;
if (!ActiveQueries.TryRemove(disposeParams.OwnerUri, out result))
{
await requestContext.SendResult(new QueryDisposeResult
{
Messages = SR.QueryServiceRequestsNoQuery
});
return;
}
// Setup action for success and failure
Func<Task> successAction = () => requestContext.SendResult(new QueryDisposeResult());
Func<string, Task> failureAction = requestContext.SendError;
// Cleanup the query
result.Dispose();
// Success
await requestContext.SendResult(new QueryDisposeResult
{
Messages = null
});
}
catch (Exception e)
{
await requestContext.SendError(e.Message);
}
// Use the inter-service dispose functionality
await InterServiceDisposeQuery(disposeParams.OwnerUri, successAction, failureAction);
}
/// <summary>
/// Handles a request to cancel this query if it is in progress
/// </summary>
public async Task HandleCancelRequest(QueryCancelParams cancelParams,
internal async Task HandleCancelRequest(QueryCancelParams cancelParams,
RequestContext<QueryCancelResult> requestContext)
{
try
@@ -338,9 +316,75 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
#endregion
#region Inter-Service API Handlers
/// <summary>
/// Query execution meant to be called from another service. Utilizes callbacks to allow
/// custom actions to be taken upon creation of query and failure to create query.
/// </summary>
/// <param name="executeParams">Params for creating the new query</param>
/// <param name="eventSender">Object that can send events for query execution progress</param>
/// <param name="queryCreatedAction">
/// Action to perform when query has been successfully created, right before execution of
/// the query
/// </param>
/// <param name="failureAction">Action to perform if query was not successfully created</param>
public async Task InterServiceExecuteQuery(ExecuteRequestParamsBase executeParams, IEventSender eventSender,
Func<Task> queryCreatedAction, Func<string, Task> failureAction)
{
Validate.IsNotNull(nameof(executeParams), executeParams);
Validate.IsNotNull(nameof(eventSender), eventSender);
Validate.IsNotNull(nameof(queryCreatedAction), queryCreatedAction);
Validate.IsNotNull(nameof(failureAction), failureAction);
// Get a new active query
Query newQuery = await CreateAndActivateNewQuery(executeParams, queryCreatedAction, failureAction);
// Execute the query asynchronously
ExecuteAndCompleteQuery(executeParams.OwnerUri, eventSender, newQuery);
}
/// <summary>
/// Query disposal meant to be called from another service. Utilizes callbacks to allow
/// custom actions to be performed on success or failure.
/// </summary>
/// <param name="ownerUri">The identifier of the query to be disposed</param>
/// <param name="successAction">Action to perform on success</param>
/// <param name="failureAction">Action to perform on failure</param>
/// <returns></returns>
public async Task InterServiceDisposeQuery(string ownerUri, Func<Task> successAction,
Func<string, Task> failureAction)
{
Validate.IsNotNull(nameof(successAction), successAction);
Validate.IsNotNull(nameof(failureAction), failureAction);
try
{
// Attempt to remove the query for the owner uri
Query result;
if (!ActiveQueries.TryRemove(ownerUri, out result))
{
await failureAction(SR.QueryServiceRequestsNoQuery);
return;
}
// Cleanup the query
result.Dispose();
// Success
await successAction();
}
catch (Exception e)
{
await failureAction(e.Message);
}
}
#endregion
#region Private Helpers
private async Task<Query> CreateAndActivateNewQuery(ExecuteRequestParamsBase executeParams, RequestContext<ExecuteRequestResult> requestContext)
private async Task<Query> CreateAndActivateNewQuery(ExecuteRequestParamsBase executeParams, Func<Task> successAction, Func<string, Task> failureAction)
{
try
{
@@ -348,7 +392,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
ConnectionInfo connectionInfo;
if (!ConnectionService.TryFindConnection(executeParams.OwnerUri, out connectionInfo))
{
await requestContext.SendError(SR.QueryServiceQueryInvalidOwnerUri);
await failureAction(SR.QueryServiceQueryInvalidOwnerUri);
return null;
}
@@ -370,24 +414,24 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
Query newQuery = new Query(GetSqlText(executeParams), connectionInfo, settings, BufferFileFactory);
if (!ActiveQueries.TryAdd(executeParams.OwnerUri, newQuery))
{
await requestContext.SendError(SR.QueryServiceQueryInProgress);
await failureAction(SR.QueryServiceQueryInProgress);
newQuery.Dispose();
return null;
}
// Send the result stating that the query was successfully started
await requestContext.SendResult(new ExecuteRequestResult());
// Successfully created query
await successAction();
return newQuery;
}
catch (Exception e)
{
await requestContext.SendError(e.Message);
await failureAction(e.Message);
return null;
}
}
private static void ExecuteAndCompleteQuery(ExecuteRequestParamsBase executeDocumentSelectionParams, RequestContext<ExecuteRequestResult> requestContext, Query query)
private static void ExecuteAndCompleteQuery(string ownerUri, IEventSender eventSender, Query query)
{
// Skip processing if the query is null
if (query == null)
@@ -401,11 +445,11 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
// Send back the results
QueryCompleteParams eventParams = new QueryCompleteParams
{
OwnerUri = executeDocumentSelectionParams.OwnerUri,
OwnerUri = ownerUri,
BatchSummaries = q.BatchSummaries
};
await requestContext.SendEvent(QueryCompleteEvent.Type, eventParams);
await eventSender.SendEvent(QueryCompleteEvent.Type, eventParams);
};
Query.QueryAsyncErrorEventHandler errorCallback = async errorMessage =>
@@ -413,10 +457,10 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
// Send back the error message
QueryCompleteParams eventParams = new QueryCompleteParams
{
OwnerUri = executeDocumentSelectionParams.OwnerUri,
OwnerUri = ownerUri,
//Message = errorMessage
};
await requestContext.SendEvent(QueryCompleteEvent.Type, eventParams);
await eventSender.SendEvent(QueryCompleteEvent.Type, eventParams);
};
query.QueryCompleted += callback;
@@ -429,10 +473,10 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
BatchEventParams eventParams = new BatchEventParams
{
BatchSummary = b.Summary,
OwnerUri = executeDocumentSelectionParams.OwnerUri
OwnerUri = ownerUri
};
await requestContext.SendEvent(BatchStartEvent.Type, eventParams);
await eventSender.SendEvent(BatchStartEvent.Type, eventParams);
};
query.BatchStarted += batchStartCallback;
@@ -441,10 +485,10 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
BatchEventParams eventParams = new BatchEventParams
{
BatchSummary = b.Summary,
OwnerUri = executeDocumentSelectionParams.OwnerUri
OwnerUri = ownerUri
};
await requestContext.SendEvent(BatchCompleteEvent.Type, eventParams);
await eventSender.SendEvent(BatchCompleteEvent.Type, eventParams);
};
query.BatchCompleted += batchCompleteCallback;
@@ -453,9 +497,9 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
MessageParams eventParams = new MessageParams
{
Message = m,
OwnerUri = executeDocumentSelectionParams.OwnerUri
OwnerUri = ownerUri
};
await requestContext.SendEvent(MessageEvent.Type, eventParams);
await eventSender.SendEvent(MessageEvent.Type, eventParams);
};
query.BatchMessageSent += batchMessageCallback;
@@ -465,9 +509,9 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
ResultSetEventParams eventParams = new ResultSetEventParams
{
ResultSetSummary = r.Summary,
OwnerUri = executeDocumentSelectionParams.OwnerUri
OwnerUri = ownerUri
};
await requestContext.SendEvent(ResultSetCompleteEvent.Type, eventParams);
await eventSender.SendEvent(ResultSetCompleteEvent.Type, eventParams);
};
query.ResultSetCompleted += resultCallback;