// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. // using System; using System.Collections.Concurrent; using System.IO; using System.Threading.Tasks; using Microsoft.SqlTools.ServiceLayer.Connection; using Microsoft.SqlTools.Hosting.Protocol; using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts; using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts.ExecuteRequests; using Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage; using Microsoft.SqlTools.ServiceLayer.SqlContext; using Microsoft.SqlTools.ServiceLayer.Workspace; using Microsoft.SqlTools.ServiceLayer.Workspace.Contracts; using Microsoft.SqlTools.Utility; using Microsoft.SqlTools.ServiceLayer.Hosting; namespace Microsoft.SqlTools.ServiceLayer.QueryExecution { /// /// Service for executing queries /// public sealed class QueryExecutionService : IDisposable { #region Singleton Instance Implementation private static readonly Lazy LazyInstance = new Lazy(() => new QueryExecutionService()); /// /// Singleton instance of the query execution service /// public static QueryExecutionService Instance => LazyInstance.Value; private QueryExecutionService() { ConnectionService = ConnectionService.Instance; WorkspaceService = WorkspaceService.Instance; Settings = new SqlToolsSettings(); } internal QueryExecutionService(ConnectionService connService, WorkspaceService workspaceService) { ConnectionService = connService; WorkspaceService = workspaceService; Settings = new SqlToolsSettings(); } #endregion #region Properties /// /// File factory to be used to create a buffer file for results. /// /// /// Made internal here to allow for overriding in unit testing /// internal IFileStreamFactory BufferFileStreamFactory; /// /// File factory to be used to create a buffer file for results /// private IFileStreamFactory BufferFileFactory { get { if (BufferFileStreamFactory == null) { BufferFileStreamFactory = new ServiceBufferFileStreamFactory { ExecutionSettings = Settings.QueryExecutionSettings }; } return BufferFileStreamFactory; } } /// /// File factory to be used to create CSV files from result sets. Set to internal in order /// to allow overriding in unit testing /// internal IFileStreamFactory CsvFileFactory { get; set; } /// /// File factory to be used to create Excel files from result sets. Set to internal in order /// to allow overriding in unit testing /// internal IFileStreamFactory ExcelFileFactory { get; set; } /// /// File factory to be used to create JSON files from result sets. Set to internal in order /// to allow overriding in unit testing /// internal IFileStreamFactory JsonFileFactory { get; set; } /// /// The collection of active queries /// internal ConcurrentDictionary ActiveQueries => queries.Value; /// /// Instance of the connection service, used to get the connection info for a given owner URI /// private ConnectionService ConnectionService { get; } private WorkspaceService WorkspaceService { get; } /// /// Internal storage of active queries, lazily constructed as a threadsafe dictionary /// private readonly Lazy> queries = new Lazy>(() => new ConcurrentDictionary()); /// /// Settings that will be used to execute queries. Internal for unit testing /// internal SqlToolsSettings Settings { get; set; } #endregion /// /// Initializes the service with the service host, registers request handlers and shutdown /// event handler. /// /// The service host instance to register with public void InitializeService(ServiceHost serviceHost) { // Register handlers for requests serviceHost.SetRequestHandler(ExecuteDocumentSelectionRequest.Type, HandleExecuteRequest); serviceHost.SetRequestHandler(ExecuteStringRequest.Type, HandleExecuteRequest); serviceHost.SetRequestHandler(SubsetRequest.Type, HandleResultSubsetRequest); serviceHost.SetRequestHandler(QueryDisposeRequest.Type, HandleDisposeRequest); serviceHost.SetRequestHandler(QueryCancelRequest.Type, HandleCancelRequest); serviceHost.SetRequestHandler(SaveResultsAsCsvRequest.Type, HandleSaveResultsAsCsvRequest); serviceHost.SetRequestHandler(SaveResultsAsExcelRequest.Type, HandleSaveResultsAsExcelRequest); serviceHost.SetRequestHandler(SaveResultsAsJsonRequest.Type, HandleSaveResultsAsJsonRequest); serviceHost.SetRequestHandler(QueryExecutionPlanRequest.Type, HandleExecutionPlanRequest); serviceHost.SetRequestHandler(SimpleExecuteRequest.Type, HandleSimpleExecuteRequest); // Register handler for shutdown event serviceHost.RegisterShutdownTask((shutdownParams, requestContext) => { Dispose(); return Task.FromResult(0); }); // Register a handler for when the configuration changes WorkspaceService.RegisterConfigChangeCallback(UpdateSettings); } #region Request Handlers /// /// Handles request to execute a selection of a document in the workspace service /// internal Task HandleExecuteRequest(ExecuteRequestParamsBase executeParams, RequestContext requestContext) { // Setup actions to perform upon successful start and on failure to start Func> queryCreateSuccessAction = async q => { await requestContext.SendResult(new ExecuteRequestResult()); return true; }; Func queryCreateFailureAction = message => requestContext.SendError(message); // Use the internal handler to launch the query return InterServiceExecuteQuery(executeParams, null, requestContext, queryCreateSuccessAction, queryCreateFailureAction, null, null); } /// /// Handles a request to execute a string and return the result /// internal Task HandleSimpleExecuteRequest(SimpleExecuteParams executeParams, RequestContext requestContext) { ExecuteStringParams executeStringParams = new ExecuteStringParams { Query = executeParams.QueryString, // generate guid as the owner uri to make sure every query is unique OwnerUri = Guid.NewGuid().ToString() }; // get connection ConnectionInfo connInfo; if (!ConnectionService.TryFindConnection(executeParams.OwnerUri, out connInfo)) { return requestContext.SendError(SR.QueryServiceQueryInvalidOwnerUri); } if (connInfo.ConnectionDetails.MultipleActiveResultSets == null || connInfo.ConnectionDetails.MultipleActiveResultSets == false) { // if multipleActive result sets is not allowed, don't specific a connection and make the ownerURI the true owneruri connInfo = null; executeStringParams.OwnerUri = executeParams.OwnerUri; } Func queryCreateFailureAction = message => requestContext.SendError(message); ResultOnlyContext newContext = new ResultOnlyContext(requestContext); // handle sending event back when the query completes Query.QueryAsyncEventHandler queryComplete = async q => { Query removedQuery; // check to make sure any results were recieved if (q.Batches.Length == 0 || q.Batches[0].ResultSets.Count == 0) { await requestContext.SendError(SR.QueryServiceResultSetHasNoResults); ActiveQueries.TryRemove(executeStringParams.OwnerUri, out removedQuery); return; } var rowCount = q.Batches[0].ResultSets[0].RowCount; // check to make sure there is a safe amount of rows to load into memory if (rowCount > Int32.MaxValue) { await requestContext.SendError(SR.QueryServiceResultSetTooLarge); ActiveQueries.TryRemove(executeStringParams.OwnerUri, out removedQuery); return; } SubsetParams subsetRequestParams = new SubsetParams { OwnerUri = executeStringParams.OwnerUri, BatchIndex = 0, ResultSetIndex = 0, RowsStartIndex = 0, RowsCount = Convert.ToInt32(rowCount) }; // get the data to send back ResultSetSubset subset = await InterServiceResultSubset(subsetRequestParams); SimpleExecuteResult result = new SimpleExecuteResult { RowCount = q.Batches[0].ResultSets[0].RowCount, ColumnInfo = q.Batches[0].ResultSets[0].Columns, Rows = subset.Rows }; await requestContext.SendResult(result); // remove the active query since we are done with it ActiveQueries.TryRemove(executeStringParams.OwnerUri, out removedQuery); }; // handle sending error back when query fails Query.QueryAsyncErrorEventHandler queryFail = async (q, e) => { await requestContext.SendError(e); }; return InterServiceExecuteQuery(executeStringParams, connInfo, newContext, null, queryCreateFailureAction, queryComplete, queryFail); } /// /// Handles a request to get a subset of the results of this query /// internal async Task HandleResultSubsetRequest(SubsetParams subsetParams, RequestContext requestContext) { try { ResultSetSubset subset = await InterServiceResultSubset(subsetParams); var result = new SubsetResult { ResultSubset = subset }; await requestContext.SendResult(result); } catch (Exception e) { // This was unexpected, so send back as error await requestContext.SendError(e.Message); } } /// /// Handles a request to get an execution plan /// internal async Task HandleExecutionPlanRequest(QueryExecutionPlanParams planParams, RequestContext requestContext) { try { // Attempt to load the query Query query; if (!ActiveQueries.TryGetValue(planParams.OwnerUri, out query)) { await requestContext.SendError(SR.QueryServiceRequestsNoQuery); return; } // Retrieve the requested execution plan and return it var result = new QueryExecutionPlanResult { ExecutionPlan = await query.GetExecutionPlan(planParams.BatchIndex, planParams.ResultSetIndex) }; await requestContext.SendResult(result); } catch (Exception e) { // This was unexpected, so send back as error await requestContext.SendError(e.Message); } } /// /// Handles a request to dispose of this query /// internal async Task HandleDisposeRequest(QueryDisposeParams disposeParams, RequestContext requestContext) { // Setup action for success and failure Func successAction = () => requestContext.SendResult(new QueryDisposeResult()); Func failureAction = message => requestContext.SendError(message); // Use the inter-service dispose functionality await InterServiceDisposeQuery(disposeParams.OwnerUri, successAction, failureAction); } /// /// Handles a request to cancel this query if it is in progress /// internal async Task HandleCancelRequest(QueryCancelParams cancelParams, RequestContext requestContext) { try { // Attempt to find the query for the owner uri Query result; if (!ActiveQueries.TryGetValue(cancelParams.OwnerUri, out result)) { await requestContext.SendResult(new QueryCancelResult { Messages = SR.QueryServiceRequestsNoQuery }); return; } // Cancel the query and send a success message result.Cancel(); await requestContext.SendResult(new QueryCancelResult()); } catch (InvalidOperationException e) { // If this exception occurred, we most likely were trying to cancel a completed query await requestContext.SendResult(new QueryCancelResult { Messages = e.Message }); } catch (Exception e) { await requestContext.SendError(e.Message); } } /// /// Process request to save a resultSet to a file in CSV format /// internal async Task HandleSaveResultsAsCsvRequest(SaveResultsAsCsvRequestParams saveParams, RequestContext requestContext) { // Use the default CSV file factory if we haven't overridden it IFileStreamFactory csvFactory = CsvFileFactory ?? new SaveAsCsvFileStreamFactory { SaveRequestParams = saveParams, QueryExecutionSettings = Settings.QueryExecutionSettings }; await SaveResultsHelper(saveParams, requestContext, csvFactory); } /// /// Process request to save a resultSet to a file in Excel format /// internal async Task HandleSaveResultsAsExcelRequest(SaveResultsAsExcelRequestParams saveParams, RequestContext requestContext) { // Use the default Excel file factory if we haven't overridden it IFileStreamFactory excelFactory = ExcelFileFactory ?? new SaveAsExcelFileStreamFactory { SaveRequestParams = saveParams, QueryExecutionSettings = Settings.QueryExecutionSettings }; await SaveResultsHelper(saveParams, requestContext, excelFactory); } /// /// Process request to save a resultSet to a file in JSON format /// internal async Task HandleSaveResultsAsJsonRequest(SaveResultsAsJsonRequestParams saveParams, RequestContext requestContext) { // Use the default JSON file factory if we haven't overridden it IFileStreamFactory jsonFactory = JsonFileFactory ?? new SaveAsJsonFileStreamFactory { SaveRequestParams = saveParams, QueryExecutionSettings = Settings.QueryExecutionSettings }; await SaveResultsHelper(saveParams, requestContext, jsonFactory); } #endregion #region Inter-Service API Handlers /// /// Query execution meant to be called from another service. Utilizes callbacks to allow /// custom actions to be taken upon creation of query and failure to create query. /// /// Parameters for execution /// Connection Info to use; will try and get the connection from owneruri if not provided /// Event sender that will send progressive events during execution of the query /// /// Callback for when query has been created successfully. If result is true, query /// will be executed asynchronously. If result is false, query will be disposed. May /// be null /// /// /// Callback for when query failed to be created successfully. Error message is provided. /// May be null. /// /// /// Callback to call when query has completed execution successfully. May be null. /// /// /// Callback to call when query has completed execution with errors. May be null. /// public async Task InterServiceExecuteQuery(ExecuteRequestParamsBase executeParams, ConnectionInfo connInfo, IEventSender queryEventSender, Func> queryCreateSuccessFunc, Func queryCreateFailFunc, Query.QueryAsyncEventHandler querySuccessFunc, Query.QueryAsyncErrorEventHandler queryFailureFunc) { Validate.IsNotNull(nameof(executeParams), executeParams); Validate.IsNotNull(nameof(queryEventSender), queryEventSender); Query newQuery; try { // Get a new active query newQuery = CreateQuery(executeParams, connInfo); if (queryCreateSuccessFunc != null && !await queryCreateSuccessFunc(newQuery)) { // The callback doesn't want us to continue, for some reason // It's ok if we leave the query behind in the active query list, the next call // to execute will replace it. newQuery.Dispose(); return; } } catch (Exception e) { // Call the failure callback if it was provided if (queryCreateFailFunc != null) { await queryCreateFailFunc(e.Message); } return; } // Execute the query asynchronously ExecuteAndCompleteQuery(executeParams.OwnerUri, newQuery, queryEventSender, querySuccessFunc, queryFailureFunc); } /// /// Query disposal meant to be called from another service. Utilizes callbacks to allow /// custom actions to be performed on success or failure. /// /// The identifier of the query to be disposed /// Action to perform on success /// Action to perform on failure public async Task InterServiceDisposeQuery(string ownerUri, Func successAction, Func failureAction) { Validate.IsNotNull(nameof(successAction), successAction); Validate.IsNotNull(nameof(failureAction), failureAction); try { // Attempt to remove the query for the owner uri Query result; if (!ActiveQueries.TryRemove(ownerUri, out result)) { await failureAction(SR.QueryServiceRequestsNoQuery); return; } // Cleanup the query result.Dispose(); // Success await successAction(); } catch (Exception e) { await failureAction(e.Message); } } /// /// Retrieves the requested subset of rows from the requested result set. Intended to be /// called by another service. /// /// Parameters for the subset to retrieve /// The requested subset /// The requested query does not exist public async Task InterServiceResultSubset(SubsetParams subsetParams) { Validate.IsNotNullOrEmptyString(nameof(subsetParams.OwnerUri), subsetParams.OwnerUri); // Attempt to load the query Query query; if (!ActiveQueries.TryGetValue(subsetParams.OwnerUri, out query)) { throw new ArgumentOutOfRangeException(SR.QueryServiceRequestsNoQuery); } // Retrieve the requested subset and return it return await query.GetSubset(subsetParams.BatchIndex, subsetParams.ResultSetIndex, subsetParams.RowsStartIndex, subsetParams.RowsCount); } #endregion #region Private Helpers private Query CreateQuery(ExecuteRequestParamsBase executeParams, ConnectionInfo connInfo) { // Attempt to get the connection for the editor ConnectionInfo connectionInfo; if (connInfo != null) { connectionInfo = connInfo; } else if (!ConnectionService.TryFindConnection(executeParams.OwnerUri, out connectionInfo)) { throw new ArgumentOutOfRangeException(nameof(executeParams.OwnerUri), SR.QueryServiceQueryInvalidOwnerUri); } // Attempt to clean out any old query on the owner URI Query oldQuery; if (ActiveQueries.TryGetValue(executeParams.OwnerUri, out oldQuery) && oldQuery.HasExecuted) { oldQuery.Dispose(); ActiveQueries.TryRemove(executeParams.OwnerUri, out oldQuery); } // Retrieve the current settings for executing the query with QueryExecutionSettings settings = Settings.QueryExecutionSettings; // Apply execution parameter settings settings.ExecutionPlanOptions = executeParams.ExecutionPlanOptions; // If we can't add the query now, it's assumed the query is in progress Query newQuery = new Query(GetSqlText(executeParams), connectionInfo, settings, BufferFileFactory); if (!ActiveQueries.TryAdd(executeParams.OwnerUri, newQuery)) { newQuery.Dispose(); throw new InvalidOperationException(SR.QueryServiceQueryInProgress); } return newQuery; } private static void ExecuteAndCompleteQuery(string ownerUri, Query query, IEventSender eventSender, Query.QueryAsyncEventHandler querySuccessCallback, Query.QueryAsyncErrorEventHandler queryFailureCallback) { // Setup the callback to send the complete event Query.QueryAsyncEventHandler completeCallback = async q => { // Send back the results QueryCompleteParams eventParams = new QueryCompleteParams { OwnerUri = ownerUri, BatchSummaries = q.BatchSummaries }; await eventSender.SendEvent(QueryCompleteEvent.Type, eventParams); }; // Setup the callback to send the complete event Query.QueryAsyncErrorEventHandler failureCallback = async (q, e) => { // Send back the results QueryCompleteParams eventParams = new QueryCompleteParams { OwnerUri = ownerUri, BatchSummaries = q.BatchSummaries }; await eventSender.SendEvent(QueryCompleteEvent.Type, eventParams); }; query.QueryCompleted += completeCallback; query.QueryFailed += failureCallback; // Add the callbacks that were provided by the caller // If they're null, that's no problem query.QueryCompleted += querySuccessCallback; query.QueryFailed += queryFailureCallback; // Setup the batch callbacks Batch.BatchAsyncEventHandler batchStartCallback = async b => { BatchEventParams eventParams = new BatchEventParams { BatchSummary = b.Summary, OwnerUri = ownerUri }; await eventSender.SendEvent(BatchStartEvent.Type, eventParams); }; query.BatchStarted += batchStartCallback; Batch.BatchAsyncEventHandler batchCompleteCallback = async b => { BatchEventParams eventParams = new BatchEventParams { BatchSummary = b.Summary, OwnerUri = ownerUri }; await eventSender.SendEvent(BatchCompleteEvent.Type, eventParams); }; query.BatchCompleted += batchCompleteCallback; Batch.BatchAsyncMessageHandler batchMessageCallback = async m => { MessageParams eventParams = new MessageParams { Message = m, OwnerUri = ownerUri }; await eventSender.SendEvent(MessageEvent.Type, eventParams); }; query.BatchMessageSent += batchMessageCallback; // Setup the ResultSet completion callback ResultSet.ResultSetAsyncEventHandler resultCallback = async r => { ResultSetEventParams eventParams = new ResultSetEventParams { ResultSetSummary = r.Summary, OwnerUri = ownerUri }; await eventSender.SendEvent(ResultSetCompleteEvent.Type, eventParams); }; query.ResultSetCompleted += resultCallback; // Launch this as an asynchronous task query.Execute(); } private async Task SaveResultsHelper(SaveResultsRequestParams saveParams, RequestContext requestContext, IFileStreamFactory fileFactory) { // retrieve query for OwnerUri Query query; if (!ActiveQueries.TryGetValue(saveParams.OwnerUri, out query)) { await requestContext.SendError(SR.QueryServiceQueryInvalidOwnerUri); return; } //Setup the callback for completion of the save task ResultSet.SaveAsAsyncEventHandler successHandler = async parameters => { await requestContext.SendResult(new SaveResultRequestResult()); }; ResultSet.SaveAsFailureAsyncEventHandler errorHandler = async (parameters, reason) => { string message = SR.QueryServiceSaveAsFail(Path.GetFileName(parameters.FilePath), reason); await requestContext.SendError(message); }; try { // Launch the task query.SaveAs(saveParams, fileFactory, successHandler, errorHandler); } catch (Exception e) { await errorHandler(saveParams, e.Message); } } // Internal for testing purposes internal string GetSqlText(ExecuteRequestParamsBase request) { // If it is a document selection, we'll retrieve the text from the document ExecuteDocumentSelectionParams docRequest = request as ExecuteDocumentSelectionParams; if (docRequest != null) { // Get the document from the parameters ScriptFile queryFile = WorkspaceService.Workspace.GetFile(docRequest.OwnerUri); // If a selection was not provided, use the entire document if (docRequest.QuerySelection == null) { return queryFile.Contents; } // A selection was provided, so get the lines in the selected range string[] queryTextArray = queryFile.GetLinesInRange( new BufferRange( new BufferPosition( docRequest.QuerySelection.StartLine + 1, docRequest.QuerySelection.StartColumn + 1 ), new BufferPosition( docRequest.QuerySelection.EndLine + 1, docRequest.QuerySelection.EndColumn + 1 ) ) ); return string.Join(Environment.NewLine, queryTextArray); } // If it is an ExecuteStringParams, return the text as is ExecuteStringParams stringRequest = request as ExecuteStringParams; if (stringRequest != null) { return stringRequest.Query; } // Note, this shouldn't be possible due to inheritance rules throw new InvalidCastException("Invalid request type"); } /// Internal for testing purposes internal Task UpdateSettings(SqlToolsSettings newSettings, SqlToolsSettings oldSettings, EventContext eventContext) { Settings.QueryExecutionSettings.Update(newSettings.QueryExecutionSettings); return Task.FromResult(0); } #endregion #region IDisposable Implementation private bool disposed; public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (disposed) { return; } if (disposing) { foreach (var query in ActiveQueries) { if (!query.Value.HasExecuted) { try { query.Value.Cancel(); } catch (Exception e) { // We don't particularly care if we fail to cancel during shutdown string message = string.Format("Failed to cancel query {0} during query service disposal: {1}", query.Key, e); Logger.Write(LogLevel.Warning, message); } } query.Value.Dispose(); } ActiveQueries.Clear(); } disposed = true; } ~QueryExecutionService() { Dispose(false); } #endregion } }