// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; using System.Data.Common; using System.Data.SqlClient; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.SqlServer.Management.SqlParser.Parser; using Microsoft.SqlTools.ServiceLayer.Connection; using Microsoft.SqlTools.ServiceLayer.Connection.ReliableConnection; using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts; using Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage; using Microsoft.SqlTools.ServiceLayer.SqlContext; using Microsoft.SqlTools.ServiceLayer.Utility; using Microsoft.SqlTools.ServiceLayer.Connection.Contracts; using System.Collections.Generic; namespace Microsoft.SqlTools.ServiceLayer.QueryExecution { /// /// Internal representation of an active query /// public class Query : IDisposable { /// /// "Error" code produced by SQL Server when the database context (name) for a connection changes. /// private const int DatabaseContextChangeErrorNumber = 5701; #region Member Variables /// /// Cancellation token source, used for cancelling async db actions /// private readonly CancellationTokenSource cancellationSource; /// /// For IDisposable implementation, whether or not this object has been disposed /// private bool disposed; /// /// The connection info associated with the file editor owner URI, used to create a new /// connection upon execution of the query /// private readonly ConnectionInfo editorConnection; /// /// Whether or not the execute method has been called for this query /// private bool hasExecuteBeenCalled; /// /// Settings for query runtime /// private QueryExecutionSettings querySettings; /// /// Streaming output factory for the query /// private IFileStreamFactory streamOutputFactory; /// /// ON keyword /// private const string On = "ON"; /// /// OFF keyword /// private const string Off = "OFF"; /// /// showplan_xml statement /// private const string SetShowPlanXml = "SET SHOWPLAN_XML {0}"; /// /// statistics xml statement /// private const string SetStatisticsXml = "SET STATISTICS XML {0}"; #endregion /// /// Constructor for a query /// /// The text of the query to execute /// The information of the connection to use to execute the query /// Settings for how to execute the query, from the user /// Factory for creating output files public Query(string queryText, ConnectionInfo connection, QueryExecutionSettings settings, IFileStreamFactory outputFactory) { // Sanity check for input Validate.IsNotNullOrEmptyString(nameof(queryText), queryText); Validate.IsNotNull(nameof(connection), connection); Validate.IsNotNull(nameof(settings), settings); Validate.IsNotNull(nameof(outputFactory), outputFactory); // Initialize the internal state QueryText = queryText; editorConnection = connection; cancellationSource = new CancellationTokenSource(); querySettings = settings; streamOutputFactory = outputFactory; // Process the query into batches ParseResult parseResult = Parser.Parse(queryText, new ParseOptions { BatchSeparator = settings.BatchSeparator }); // NOTE: We only want to process batches that have statements (ie, ignore comments and empty lines) var batchSelection = parseResult.Script.Batches .Where(batch => batch.Statements.Count > 0) .Select((batch, index) => new Batch(batch.Sql, new SelectionData( batch.StartLocation.LineNumber - 1, batch.StartLocation.ColumnNumber - 1, batch.EndLocation.LineNumber - 1, batch.EndLocation.ColumnNumber - 1), index, outputFactory)); Batches = batchSelection.ToArray(); // Create our batch lists BeforeBatches = new List(); AfterBatches = new List(); if (DoesSupportExecutionPlan(connection)) { // Checking settings for execution plan options if (querySettings.ExecutionPlanOptions.IncludeEstimatedExecutionPlanXml) { // Enable set showplan xml addBatch(string.Format(SetShowPlanXml, On), BeforeBatches, streamOutputFactory); addBatch(string.Format(SetShowPlanXml, Off), AfterBatches, streamOutputFactory); } else if (querySettings.ExecutionPlanOptions.IncludeActualExecutionPlanXml) { addBatch(string.Format(SetStatisticsXml, On), BeforeBatches, streamOutputFactory); addBatch(string.Format(SetStatisticsXml, Off), AfterBatches, streamOutputFactory); } } } #region Events /// /// Event to be called when a batch is completed. /// public event Batch.BatchAsyncEventHandler BatchCompleted; /// /// Event that will be called when a message has been emitted /// public event Batch.BatchAsyncMessageHandler BatchMessageSent; /// /// Event to be called when a batch starts execution. /// public event Batch.BatchAsyncEventHandler BatchStarted; /// /// Delegate type for callback when a query connection fails /// /// Error message for the failing query public delegate Task QueryAsyncErrorEventHandler(string message); /// /// Callback for when the query has completed successfully /// public event QueryAsyncEventHandler QueryCompleted; /// /// Callback for when the query has failed /// public event QueryAsyncEventHandler QueryFailed; /// /// Callback for when the query connection has failed /// public event QueryAsyncErrorEventHandler QueryConnectionException; /// /// Event to be called when a resultset has completed. /// public event ResultSet.ResultSetAsyncEventHandler ResultSetCompleted; #endregion #region Properties /// /// Delegate type for callback when a query completes or fails /// /// The query that completed public delegate Task QueryAsyncEventHandler(Query q); /// /// The batches which should run before the user batches /// internal List BeforeBatches { get; set; } /// /// The batches underneath this query /// internal Batch[] Batches { get; set; } /// /// The batches which should run after the user batches /// internal List AfterBatches { get; set; } /// /// The summaries of the batches underneath this query /// public BatchSummary[] BatchSummaries { get { if (!HasExecuted) { throw new InvalidOperationException("Query has not been executed."); } return Batches.Select(b => b.Summary).ToArray(); } } /// /// Storage for the async task for execution. Set as internal in order to await completion /// in unit tests. /// internal Task ExecutionTask { get; private set; } /// /// Whether or not the query has completed executed, regardless of success or failure /// /// /// Don't touch the setter unless you're doing unit tests! /// public bool HasExecuted { get { return Batches.Length == 0 ? hasExecuteBeenCalled : Batches.All(b => b.HasExecuted); } internal set { hasExecuteBeenCalled = value; foreach (var batch in Batches) { batch.HasExecuted = value; } } } /// /// The text of the query to execute /// public string QueryText { get; set; } #endregion #region Public Methods /// /// Cancels the query by issuing the cancellation token /// public void Cancel() { // Make sure that the query hasn't completed execution if (HasExecuted) { throw new InvalidOperationException(SR.QueryServiceCancelAlreadyCompleted); } // Issue the cancellation token for the query cancellationSource.Cancel(); } /// /// Launches the asynchronous process for executing the query /// public void Execute() { ExecutionTask = Task.Run(ExecuteInternal); } /// /// Retrieves a subset of the result sets /// /// The index for selecting the batch item /// The index for selecting the result set /// The starting row of the results /// How many rows to retrieve /// A subset of results public Task GetSubset(int batchIndex, int resultSetIndex, int startRow, int rowCount) { // Sanity check to make sure that the batch is within bounds if (batchIndex < 0 || batchIndex >= Batches.Length) { throw new ArgumentOutOfRangeException(nameof(batchIndex), SR.QueryServiceSubsetBatchOutOfRange); } return Batches[batchIndex].GetSubset(resultSetIndex, startRow, rowCount); } /// /// Retrieves a subset of the result sets /// /// The index for selecting the batch item /// The index for selecting the result set /// The Execution Plan, if the result set has one public Task GetExecutionPlan(int batchIndex, int resultSetIndex) { // Sanity check to make sure that the batch is within bounds if (batchIndex < 0 || batchIndex >= Batches.Length) { throw new ArgumentOutOfRangeException(nameof(batchIndex), SR.QueryServiceSubsetBatchOutOfRange); } return Batches[batchIndex].GetExecutionPlan(resultSetIndex); } /// /// Saves the requested results to a file format of the user's choice /// /// Parameters for the save as request /// /// Factory for creating the reader/writer pair for the requested output format /// /// Delegate to call when the request completes successfully /// Delegate to call if the request fails public void SaveAs(SaveResultsRequestParams saveParams, IFileStreamFactory fileFactory, ResultSet.SaveAsAsyncEventHandler successHandler, ResultSet.SaveAsFailureAsyncEventHandler failureHandler) { // Sanity check to make sure that the batch is within bounds if (saveParams.BatchIndex < 0 || saveParams.BatchIndex >= Batches.Length) { throw new ArgumentOutOfRangeException(nameof(saveParams.BatchIndex), SR.QueryServiceSubsetBatchOutOfRange); } Batches[saveParams.BatchIndex].SaveAs(saveParams, fileFactory, successHandler, failureHandler); } #endregion #region Private Helpers /// /// Executes this query asynchronously and collects all result sets /// private async Task ExecuteInternal() { // Mark that we've internally executed hasExecuteBeenCalled = true; // Don't actually execute if there aren't any batches to execute if (Batches.Length == 0) { if (BatchMessageSent != null) { await BatchMessageSent(new ResultMessage(SR.QueryServiceCompletedSuccessfully, false, null)); } if (QueryCompleted != null) { await QueryCompleted(this); } return; } // Locate and setup the connection DbConnection queryConnection = await ConnectionService.Instance.GetOrOpenConnection(editorConnection.OwnerUri, ConnectionType.Query); ReliableSqlConnection sqlConn = queryConnection as ReliableSqlConnection; if (sqlConn != null) { // Subscribe to database informational messages sqlConn.GetUnderlyingConnection().InfoMessage += OnInfoMessage; } try { // Execute beforeBatches synchronously, before the user defined batches foreach (Batch b in BeforeBatches) { await b.Execute(queryConnection, cancellationSource.Token); } // We need these to execute synchronously, otherwise the user will be very unhappy foreach (Batch b in Batches) { // Add completion callbacks b.BatchStart += BatchStarted; b.BatchCompletion += BatchCompleted; b.BatchMessageSent += BatchMessageSent; b.ResultSetCompletion += ResultSetCompleted; await b.Execute(queryConnection, cancellationSource.Token); } // Execute afterBatches synchronously, after the user defined batches foreach (Batch b in AfterBatches) { await b.Execute(queryConnection, cancellationSource.Token); } // Call the query execution callback if (QueryCompleted != null) { await QueryCompleted(this); } } catch (Exception) { // Call the query failure callback if (QueryFailed != null) { await QueryFailed(this); } } finally { if (sqlConn != null) { // Subscribe to database informational messages sqlConn.GetUnderlyingConnection().InfoMessage -= OnInfoMessage; } } } /// /// Handler for database messages during query execution /// private void OnInfoMessage(object sender, SqlInfoMessageEventArgs args) { SqlConnection conn = sender as SqlConnection; if (conn == null) { throw new InvalidOperationException(SR.QueryServiceMessageSenderNotSql); } foreach (SqlError error in args.Errors) { // Did the database context change (error code 5701)? if (error.Number == DatabaseContextChangeErrorNumber) { ConnectionService.Instance.ChangeConnectionDatabaseContext(editorConnection.OwnerUri, conn.Database); } } } /// /// Function to add a new batch to a Batch set /// private void addBatch(string query, List batchSet, IFileStreamFactory outputFactory) { batchSet.Add(new Batch(query, null, batchSet.Count, outputFactory)); } #endregion #region IDisposable Implementation public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (disposed) { return; } if (disposing) { cancellationSource.Dispose(); foreach (Batch b in Batches) { b.Dispose(); } } disposed = true; } /// /// Does this connection support XML Execution plans /// private bool DoesSupportExecutionPlan(ConnectionInfo connectionInfo) { // Determining which execution plan options may be applied (may be added to for pre-yukon support) return (!connectionInfo.IsSqlDW && connectionInfo.MajorVersion >= 9); } #endregion } }