// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. // using System; using System.Data.Common; using System.Data.SqlClient; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.SqlTools.ServiceLayer.BatchParser; using Microsoft.Kusto.ServiceLayer.Connection; using Microsoft.Kusto.ServiceLayer.QueryExecution.Contracts; using Microsoft.Kusto.ServiceLayer.QueryExecution.DataStorage; using Microsoft.Kusto.ServiceLayer.SqlContext; using Microsoft.SqlTools.Utility; using Microsoft.SqlTools.ServiceLayer.BatchParser.ExecutionEngineCode; using System.Collections.Generic; using System.Diagnostics; using Microsoft.Kusto.ServiceLayer.Utility; using System.Text; namespace Microsoft.Kusto.ServiceLayer.QueryExecution { /// /// Internal representation of an active query /// public class Query : IDisposable { #region Member Variables /// /// Cancellation token source, used for cancelling async db actions /// private readonly CancellationTokenSource cancellationSource; /// /// For IDisposable implementation, whether or not this object has been disposed /// private bool disposed; /// /// The connection info associated with the file editor owner URI, used to create a new /// connection upon execution of the query /// private readonly ConnectionInfo editorConnection; /// /// Whether or not the execute method has been called for this query /// private bool hasExecuteBeenCalled; #endregion /// /// Constructor for a query /// /// The text of the query to execute /// The information of the connection to use to execute the query /// Settings for how to execute the query, from the user /// Factory for creating output files public Query( string queryText, ConnectionInfo connection, QueryExecutionSettings settings, IFileStreamFactory outputFactory, bool getFullColumnSchema = false, bool applyExecutionSettings = false) { // Sanity check for input Validate.IsNotNull(nameof(queryText), queryText); Validate.IsNotNull(nameof(connection), connection); Validate.IsNotNull(nameof(settings), settings); Validate.IsNotNull(nameof(outputFactory), outputFactory); // Initialize the internal state QueryText = queryText; editorConnection = connection; cancellationSource = new CancellationTokenSource(); // Process the query into batches BatchParserWrapper parser = new BatchParserWrapper(); ExecutionEngineConditions conditions = null; List parserResult = parser.GetBatches(queryText, conditions); var batchSelection = parserResult .Select((batchDefinition, index) => new Batch(batchDefinition.BatchText, new SelectionData( batchDefinition.StartLine-1, batchDefinition.StartColumn-1, batchDefinition.EndLine-1, batchDefinition.EndColumn-1), index, outputFactory, batchDefinition.BatchExecutionCount, getFullColumnSchema)); Batches = batchSelection.ToArray(); // Create our batch lists BeforeBatches = new List(); AfterBatches = new List(); } #region Events /// /// Delegate type for callback when a query completes or fails /// /// The query that completed public delegate Task QueryAsyncEventHandler(Query query); /// /// Delegate type for callback when a query fails /// /// Query that raised the event /// Exception that caused the query to fail public delegate Task QueryAsyncErrorEventHandler(Query query, Exception exception); /// /// Event to be called when a batch is completed. /// public event Batch.BatchAsyncEventHandler BatchCompleted; /// /// Event that will be called when a message has been emitted /// public event Batch.BatchAsyncMessageHandler BatchMessageSent; /// /// Event to be called when a batch starts execution. /// public event Batch.BatchAsyncEventHandler BatchStarted; /// /// Callback for when the query has completed successfully /// public event QueryAsyncEventHandler QueryCompleted; /// /// Callback for when the query has failed /// public event QueryAsyncErrorEventHandler QueryFailed; /// /// Event to be called when a resultset has completed. /// public event ResultSet.ResultSetAsyncEventHandler ResultSetCompleted; /// /// Event that will be called when the resultSet first becomes available. This is as soon as we start reading the results. /// public event ResultSet.ResultSetAsyncEventHandler ResultSetAvailable; /// /// Event that will be called when additional rows in the result set are available (rowCount available has increased) /// public event ResultSet.ResultSetAsyncEventHandler ResultSetUpdated; #endregion #region Properties /// /// The batches which should run before the user batches /// private List BeforeBatches { get; } /// /// The batches underneath this query /// internal Batch[] Batches { get; } /// /// The batches which should run after the user batches /// internal List AfterBatches { get; } /// /// The summaries of the batches underneath this query /// public BatchSummary[] BatchSummaries { get { if (!HasExecuted && !HasCancelled && !HasErrored) { throw new InvalidOperationException("Query has not been executed."); } return Batches.Select(b => b.Summary).ToArray(); } } /// /// Storage for the async task for execution. Set as internal in order to await completion /// in unit tests. /// internal Task ExecutionTask { get; private set; } /// /// Whether or not the query has completed executed, regardless of success or failure /// /// /// Don't touch the setter unless you're doing unit tests! /// public bool HasExecuted { get { return Batches.Length == 0 ? hasExecuteBeenCalled : Batches.All(b => b.HasExecuted); } internal set { hasExecuteBeenCalled = value; foreach (var batch in Batches) { batch.HasExecuted = value; } } } /// /// if the query has been cancelled (before execution started) /// public bool HasCancelled { get; private set; } /// /// if the query has errored out (before batch execution started) /// public bool HasErrored { get; private set; } /// /// The text of the query to execute /// public string QueryText { get; } #endregion #region Public Methods /// /// Cancels the query by issuing the cancellation token /// public void Cancel() { // Make sure that the query hasn't completed execution if (HasExecuted) { throw new InvalidOperationException(SR.QueryServiceCancelAlreadyCompleted); } // Issue the cancellation token for the query this.HasCancelled = true; cancellationSource.Cancel(); } /// /// Launches the asynchronous process for executing the query /// public void Execute() { ExecutionTask = Task.Run(ExecuteInternal) .ContinueWithOnFaulted(async t => { if (QueryFailed != null) { await QueryFailed(this, t.Exception); } }); } /// /// Retrieves a subset of the result sets /// /// The index for selecting the batch item /// The index for selecting the result set /// The starting row of the results /// How many rows to retrieve /// A subset of results public Task GetSubset(int batchIndex, int resultSetIndex, long startRow, int rowCount) { Logger.Write(TraceEventType.Start, $"Starting GetSubset execution for batchIndex:'{batchIndex}', resultSetIndex:'{resultSetIndex}', startRow:'{startRow}', rowCount:'{rowCount}'"); // Sanity check to make sure that the batch is within bounds if (batchIndex < 0 || batchIndex >= Batches.Length) { throw new ArgumentOutOfRangeException(nameof(batchIndex), SR.QueryServiceSubsetBatchOutOfRange); } return Batches[batchIndex].GetSubset(resultSetIndex, startRow, rowCount); } /// /// Retrieves a subset of the result sets /// /// The index for selecting the batch item /// The index for selecting the result set /// The Execution Plan, if the result set has one public Task GetExecutionPlan(int batchIndex, int resultSetIndex) { // Sanity check to make sure that the batch is within bounds if (batchIndex < 0 || batchIndex >= Batches.Length) { throw new ArgumentOutOfRangeException(nameof(batchIndex), SR.QueryServiceSubsetBatchOutOfRange); } return Batches[batchIndex].GetExecutionPlan(resultSetIndex); } /// /// Saves the requested results to a file format of the user's choice /// /// Parameters for the save as request /// /// Factory for creating the reader/writer pair for the requested output format /// /// Delegate to call when the request completes successfully /// Delegate to call if the request fails public void SaveAs(SaveResultsRequestParams saveParams, IFileStreamFactory fileFactory, ResultSet.SaveAsAsyncEventHandler successHandler, ResultSet.SaveAsFailureAsyncEventHandler failureHandler) { // Sanity check to make sure that the batch is within bounds if (saveParams.BatchIndex < 0 || saveParams.BatchIndex >= Batches.Length) { throw new ArgumentOutOfRangeException(nameof(saveParams.BatchIndex), SR.QueryServiceSubsetBatchOutOfRange); } Batches[saveParams.BatchIndex].SaveAs(saveParams, fileFactory, successHandler, failureHandler); } #endregion #region Private Helpers /// /// Executes this query asynchronously and collects all result sets /// private async Task ExecuteInternal() { ReliableDataSourceConnection sqlConn = null; try { // check for cancellation token before actually making connection cancellationSource.Token.ThrowIfCancellationRequested(); // Mark that we've internally executed hasExecuteBeenCalled = true; // Don't actually execute if there aren't any batches to execute if (Batches.Length == 0) { if (BatchMessageSent != null) { await BatchMessageSent(new ResultMessage(SR.QueryServiceCompletedSuccessfully, false, null)); } if (QueryCompleted != null) { await QueryCompleted(this); } return; } // Locate and setup the connection ReliableDataSourceConnection queryConnection = await ConnectionService.Instance.GetOrOpenConnection(editorConnection.OwnerUri, ConnectionType.Query); // Execute beforeBatches synchronously, before the user defined batches foreach (Batch b in BeforeBatches) { await b.Execute(queryConnection, cancellationSource.Token); } // We need these to execute synchronously, otherwise the user will be very unhappy foreach (Batch b in Batches) { // Add completion callbacks b.BatchStart += BatchStarted; b.BatchCompletion += BatchCompleted; b.BatchMessageSent += BatchMessageSent; b.ResultSetCompletion += ResultSetCompleted; b.ResultSetAvailable += ResultSetAvailable; b.ResultSetUpdated += ResultSetUpdated; await b.Execute(queryConnection, cancellationSource.Token); } // Execute afterBatches synchronously, after the user defined batches foreach (Batch b in AfterBatches) { await b.Execute(queryConnection, cancellationSource.Token); } // Call the query execution callback if (QueryCompleted != null) { await QueryCompleted(this); } } catch (Exception e) { HasErrored = true; if (e is OperationCanceledException) { await BatchMessageSent(new ResultMessage(SR.QueryServiceQueryCancelled, false, null)); } // Call the query failure callback if (QueryFailed != null) { await QueryFailed(this, e); } } finally { foreach (Batch b in Batches) { if (b.HasError) { ConnectionService.EnsureConnectionIsOpen(sqlConn); break; } } } } /// /// Function to add a new batch to a Batch set /// private static void AddBatch(string query, ICollection batchSet, IFileStreamFactory outputFactory) { batchSet.Add(new Batch(query, null, batchSet.Count, outputFactory, 1)); } #endregion #region IDisposable Implementation public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (disposed) { return; } if (disposing) { cancellationSource.Dispose(); foreach (Batch b in Batches) { b.Dispose(); } } disposed = true; } #endregion } }