// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; using System.Data.Common; using System.Data.SqlClient; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.SqlServer.Management.SqlParser.Parser; using Microsoft.SqlTools.ServiceLayer.Connection; using Microsoft.SqlTools.ServiceLayer.Connection.ReliableConnection; using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts; using Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage; using Microsoft.SqlTools.ServiceLayer.SqlContext; using Microsoft.SqlTools.ServiceLayer.Utility; namespace Microsoft.SqlTools.ServiceLayer.QueryExecution { /// /// Internal representation of an active query /// public class Query : IDisposable { /// /// "Error" code produced by SQL Server when the database context (name) for a connection changes. /// private const int DatabaseContextChangeErrorNumber = 5701; #region Member Variables /// /// Cancellation token source, used for cancelling async db actions /// private readonly CancellationTokenSource cancellationSource; /// /// For IDisposable implementation, whether or not this object has been disposed /// private bool disposed; /// /// The connection info associated with the file editor owner URI, used to create a new /// connection upon execution of the query /// private readonly ConnectionInfo editorConnection; /// /// Whether or not the execute method has been called for this query /// private bool hasExecuteBeenCalled; #endregion /// /// Constructor for a query /// /// The text of the query to execute /// The information of the connection to use to execute the query /// Settings for how to execute the query, from the user /// Factory for creating output files public Query(string queryText, ConnectionInfo connection, QueryExecutionSettings settings, IFileStreamFactory outputFactory) { // Sanity check for input Validate.IsNotNullOrEmptyString(nameof(queryText), queryText); Validate.IsNotNull(nameof(connection), connection); Validate.IsNotNull(nameof(settings), settings); Validate.IsNotNull(nameof(outputFactory), outputFactory); // Initialize the internal state QueryText = queryText; editorConnection = connection; cancellationSource = new CancellationTokenSource(); // Process the query into batches ParseResult parseResult = Parser.Parse(queryText, new ParseOptions { BatchSeparator = settings.BatchSeparator }); // NOTE: We only want to process batches that have statements (ie, ignore comments and empty lines) var batchSelection = parseResult.Script.Batches .Where(batch => batch.Statements.Count > 0) .Select((batch, index) => new Batch(batch.Sql, new SelectionData( batch.StartLocation.LineNumber - 1, batch.StartLocation.ColumnNumber - 1, batch.EndLocation.LineNumber - 1, batch.EndLocation.ColumnNumber - 1), index, outputFactory)); Batches = batchSelection.ToArray(); } #region Events /// /// Event to be called when a batch is completed. /// public event Batch.BatchAsyncEventHandler BatchCompleted; /// /// Event to be called when a batch starts execution. /// public event Batch.BatchAsyncEventHandler BatchStarted; /// /// Delegate type for callback when a query connection fails /// /// Error message for the failing query public delegate Task QueryAsyncErrorEventHandler(string message); /// /// Callback for when the query has completed successfully /// public event QueryAsyncEventHandler QueryCompleted; /// /// Callback for when the query has failed /// public event QueryAsyncEventHandler QueryFailed; /// /// Callback for when the query connection has failed /// public event QueryAsyncErrorEventHandler QueryConnectionException; /// /// Event to be called when a resultset has completed. /// public event ResultSet.ResultSetAsyncEventHandler ResultSetCompleted; #endregion #region Properties /// /// Delegate type for callback when a query completes or fails /// /// The query that completed public delegate Task QueryAsyncEventHandler(Query q); /// /// The batches underneath this query /// internal Batch[] Batches { get; set; } /// /// The summaries of the batches underneath this query /// public BatchSummary[] BatchSummaries { get { if (!HasExecuted) { throw new InvalidOperationException("Query has not been executed."); } return Batches.Select(b => b.Summary).ToArray(); } } /// /// Storage for the async task for execution. Set as internal in order to await completion /// in unit tests. /// internal Task ExecutionTask { get; private set; } /// /// Whether or not the query has completed executed, regardless of success or failure /// /// /// Don't touch the setter unless you're doing unit tests! /// public bool HasExecuted { get { return Batches.Length == 0 ? hasExecuteBeenCalled : Batches.All(b => b.HasExecuted); } internal set { hasExecuteBeenCalled = value; foreach (var batch in Batches) { batch.HasExecuted = value; } } } /// /// The text of the query to execute /// public string QueryText { get; set; } #endregion #region Public Methods /// /// Cancels the query by issuing the cancellation token /// public void Cancel() { // Make sure that the query hasn't completed execution if (HasExecuted) { throw new InvalidOperationException(SR.QueryServiceCancelAlreadyCompleted); } // Issue the cancellation token for the query cancellationSource.Cancel(); } /// /// Launches the asynchronous process for executing the query /// public void Execute() { ExecutionTask = Task.Run(ExecuteInternal); } /// /// Retrieves a subset of the result sets /// /// The index for selecting the batch item /// The index for selecting the result set /// The starting row of the results /// How many rows to retrieve /// A subset of results public Task GetSubset(int batchIndex, int resultSetIndex, int startRow, int rowCount) { // Sanity check to make sure that the batch is within bounds if (batchIndex < 0 || batchIndex >= Batches.Length) { throw new ArgumentOutOfRangeException(nameof(batchIndex), SR.QueryServiceSubsetBatchOutOfRange); } return Batches[batchIndex].GetSubset(resultSetIndex, startRow, rowCount); } /// /// Saves the requested results to a file format of the user's choice /// /// Parameters for the save as request /// /// Factory for creating the reader/writer pair for the requested output format /// /// Delegate to call when the request completes successfully /// Delegate to call if the request fails public void SaveAs(SaveResultsRequestParams saveParams, IFileStreamFactory fileFactory, ResultSet.SaveAsAsyncEventHandler successHandler, ResultSet.SaveAsFailureAsyncEventHandler failureHandler) { // Sanity check to make sure that the batch is within bounds if (saveParams.BatchIndex < 0 || saveParams.BatchIndex >= Batches.Length) { throw new ArgumentOutOfRangeException(nameof(saveParams.BatchIndex), SR.QueryServiceSubsetBatchOutOfRange); } Batches[saveParams.BatchIndex].SaveAs(saveParams, fileFactory, successHandler, failureHandler); } #endregion #region Private Helpers /// /// Executes this query asynchronously and collects all result sets /// private async Task ExecuteInternal() { // Mark that we've internally executed hasExecuteBeenCalled = true; // Don't actually execute if there aren't any batches to execute if (Batches.Length == 0) { return; } // Open up a connection for querying the database string connectionString = ConnectionService.BuildConnectionString(editorConnection.ConnectionDetails); // TODO: Don't create a new connection every time, see TFS #834978 using (DbConnection conn = editorConnection.Factory.CreateSqlConnection(connectionString)) { try { await conn.OpenAsync(); } catch (Exception exception) { this.HasExecuted = true; if (QueryConnectionException != null) { await QueryConnectionException(exception.Message); } return; } ReliableSqlConnection sqlConn = conn as ReliableSqlConnection; if (sqlConn != null) { // Subscribe to database informational messages sqlConn.GetUnderlyingConnection().InfoMessage += OnInfoMessage; } try { // We need these to execute synchronously, otherwise the user will be very unhappy foreach (Batch b in Batches) { b.BatchStart += BatchStarted; b.BatchCompletion += BatchCompleted; b.ResultSetCompletion += ResultSetCompleted; await b.Execute(conn, cancellationSource.Token); } // Call the query execution callback if (QueryCompleted != null) { await QueryCompleted(this); } } catch (Exception) { // Call the query failure callback if (QueryFailed != null) { await QueryFailed(this); } } finally { if (sqlConn != null) { // Subscribe to database informational messages sqlConn.GetUnderlyingConnection().InfoMessage -= OnInfoMessage; } } // TODO: Close connection after eliminating using statement for above TODO } } /// /// Handler for database messages during query execution /// private void OnInfoMessage(object sender, SqlInfoMessageEventArgs args) { SqlConnection conn = sender as SqlConnection; if (conn == null) { throw new InvalidOperationException(SR.QueryServiceMessageSenderNotSql); } foreach (SqlError error in args.Errors) { // Did the database context change (error code 5701)? if (error.Number == DatabaseContextChangeErrorNumber) { ConnectionService.Instance.ChangeConnectionDatabaseContext(editorConnection.OwnerUri, conn.Database); } } } #endregion #region IDisposable Implementation public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (disposed) { return; } if (disposing) { cancellationSource.Dispose(); foreach (Batch b in Batches) { b.Dispose(); } } disposed = true; } #endregion } }