// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. // using System; using System.Collections.Concurrent; using System.Data.Common; using System.Threading.Tasks; using Microsoft.SqlTools.ServiceLayer.Connection; using Microsoft.SqlTools.ServiceLayer.Hosting; using Microsoft.SqlTools.ServiceLayer.Hosting.Protocol; using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts; namespace Microsoft.SqlTools.ServiceLayer.QueryExecution { /// /// Service for executing queries /// public sealed class QueryExecutionService : IDisposable { #region Singleton Instance Implementation private static readonly Lazy instance = new Lazy(() => new QueryExecutionService()); public static QueryExecutionService Instance { get { return instance.Value; } } private QueryExecutionService() { ConnectionService = ConnectionService.Instance; } internal QueryExecutionService(ConnectionService connService) { ConnectionService = connService; } #endregion #region Properties /// /// The collection of active queries /// internal ConcurrentDictionary ActiveQueries { get { return queries.Value; } } /// /// Instance of the connection service, used to get the connection info for a given owner URI /// private ConnectionService ConnectionService { get; set; } /// /// Internal storage of active queries, lazily constructed as a threadsafe dictionary /// private readonly Lazy> queries = new Lazy>(() => new ConcurrentDictionary()); #endregion /// /// Initializes the service with the service host, registers request handlers and shutdown /// event handler. /// /// The service host instance to register with public void InitializeService(ServiceHost serviceHost) { // Register handlers for requests serviceHost.SetRequestHandler(QueryExecuteRequest.Type, HandleExecuteRequest); serviceHost.SetRequestHandler(QueryExecuteSubsetRequest.Type, HandleResultSubsetRequest); serviceHost.SetRequestHandler(QueryDisposeRequest.Type, HandleDisposeRequest); // Register handler for shutdown event serviceHost.RegisterShutdownTask((shutdownParams, requestContext) => { Dispose(); return Task.FromResult(0); }); } #region Request Handlers public async Task HandleExecuteRequest(QueryExecuteParams executeParams, RequestContext requestContext) { try { // Get a query new active query Query newQuery = await CreateAndActivateNewQuery(executeParams, requestContext); // Execute the query await ExecuteAndCompleteQuery(executeParams, requestContext, newQuery); } catch (Exception e) { // Dump any unexpected exceptions as errors await requestContext.SendError(e.Message); } } public async Task HandleResultSubsetRequest(QueryExecuteSubsetParams subsetParams, RequestContext requestContext) { try { // Attempt to load the query Query query; if (!ActiveQueries.TryGetValue(subsetParams.OwnerUri, out query)) { await requestContext.SendResult(new QueryExecuteSubsetResult { Message = "The requested query does not exist." }); return; } // Retrieve the requested subset and return it var result = new QueryExecuteSubsetResult { Message = null, ResultSubset = query.GetSubset( subsetParams.ResultSetIndex, subsetParams.RowsStartIndex, subsetParams.RowsCount) }; await requestContext.SendResult(result); } catch (InvalidOperationException ioe) { // Return the error as a result await requestContext.SendResult(new QueryExecuteSubsetResult { Message = ioe.Message }); } catch (ArgumentOutOfRangeException aoore) { // Return the error as a result await requestContext.SendResult(new QueryExecuteSubsetResult { Message = aoore.Message }); } catch (Exception e) { // This was unexpected, so send back as error await requestContext.SendError(e.Message); } } public async Task HandleDisposeRequest(QueryDisposeParams disposeParams, RequestContext requestContext) { try { // Attempt to remove the query for the owner uri Query result; if (!ActiveQueries.TryRemove(disposeParams.OwnerUri, out result)) { await requestContext.SendResult(new QueryDisposeResult { Messages = "Failed to dispose query, ID not found." }); return; } // Success await requestContext.SendResult(new QueryDisposeResult { Messages = null }); } catch (Exception e) { await requestContext.SendError(e.Message); } } #endregion #region Private Helpers private async Task CreateAndActivateNewQuery(QueryExecuteParams executeParams, RequestContext requestContext) { try { // Attempt to get the connection for the editor ConnectionInfo connectionInfo; if (!ConnectionService.TryFindConnection(executeParams.OwnerUri, out connectionInfo)) { await requestContext.SendResult(new QueryExecuteResult { Messages = "This editor is not connected to a database." }); return null; } // Attempt to clean out any old query on the owner URI Query oldQuery; if (ActiveQueries.TryGetValue(executeParams.OwnerUri, out oldQuery) && oldQuery.HasExecuted) { ActiveQueries.TryRemove(executeParams.OwnerUri, out oldQuery); } // If we can't add the query now, it's assumed the query is in progress Query newQuery = new Query(executeParams.QueryText, connectionInfo); if (!ActiveQueries.TryAdd(executeParams.OwnerUri, newQuery)) { await requestContext.SendResult(new QueryExecuteResult { Messages = "A query is already in progress for this editor session." + "Please cancel this query or wait for its completion." }); return null; } return newQuery; } catch (ArgumentNullException ane) { await requestContext.SendResult(new QueryExecuteResult { Messages = ane.Message }); return null; } // Any other exceptions will fall through here and be collected at the end } private async Task ExecuteAndCompleteQuery(QueryExecuteParams executeParams, RequestContext requestContext, Query query) { // Skip processing if the query is null if (query == null) { return; } // Launch the query and respond with successfully launching it Task executeTask = query.Execute(); await requestContext.SendResult(new QueryExecuteResult { Messages = null }); // Wait for query execution and then send back the results await Task.WhenAll(executeTask); QueryExecuteCompleteParams eventParams = new QueryExecuteCompleteParams { HasError = query.HasError, Messages = query.ResultMessages.ToArray(), OwnerUri = executeParams.OwnerUri, ResultSetSummaries = query.ResultSummary }; await requestContext.SendEvent(QueryExecuteCompleteEvent.Type, eventParams); } #endregion #region IDisposable Implementation private bool disposed; public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (disposed) { return; } if (disposing) { foreach (var query in ActiveQueries) { query.Value.Dispose(); } } disposed = true; } ~QueryExecutionService() { Dispose(false); } #endregion } }