// // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. // #nullable disable using System; using System.Collections.Generic; using System.Data; using System.Data.Common; using System.Data.SqlClient; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Kusto.ServiceLayer.Connection; using Microsoft.Kusto.ServiceLayer.QueryExecution.Contracts; using Microsoft.Kusto.ServiceLayer.QueryExecution.DataStorage; using Microsoft.SqlTools.Utility; using System.Globalization; namespace Microsoft.Kusto.ServiceLayer.QueryExecution { /// /// This class represents a batch within a query /// public class Batch : IDisposable { #region Member Variables /// /// For IDisposable implementation, whether or not this has been disposed /// private bool disposed; /// /// Local time when the execution and retrieval of files is finished /// private DateTime executionEndTime; /// /// Local time when the execution starts, specifically when the object is created /// private DateTime executionStartTime; /// /// Whether or not any messages have been sent /// private bool messagesSent; /// /// Factory for creating readers/writers for the output of the batch /// private readonly IFileStreamFactory outputFileFactory; /// /// Internal representation of the result sets so we can modify internally /// private readonly List resultSets; /// /// Special action which this batch performed /// private readonly SpecialAction specialAction; /// /// Flag indicating whether a separate KeyInfo query should be run /// to get the full ColumnSchema metadata. /// private readonly bool getFullColumnSchema; #endregion internal Batch(string batchText, SelectionData selection, int ordinalId, IFileStreamFactory outputFileFactory, int executionCount = 1, bool getFullColumnSchema = false) { // Sanity check for input Validate.IsNotNullOrEmptyString(nameof(batchText), batchText); Validate.IsNotNull(nameof(outputFileFactory), outputFileFactory); Validate.IsGreaterThan(nameof(ordinalId), ordinalId, 0); // Initialize the internal state BatchText = batchText; Selection = selection; executionStartTime = DateTime.Now; HasExecuted = false; Id = ordinalId; resultSets = new List(); this.outputFileFactory = outputFileFactory; specialAction = new SpecialAction(); BatchExecutionCount = executionCount > 0 ? executionCount : 1; this.getFullColumnSchema = getFullColumnSchema; } #region Events /// /// Asynchronous handler for when batches are completed /// /// The batch that completed public delegate Task BatchAsyncEventHandler(Batch batch); /// /// Asynchronous handler for when a message is emitted by the sql connection /// /// The message that was emitted public delegate Task BatchAsyncMessageHandler(ResultMessage message); /// /// Event that will be called when the batch has completed execution /// public event BatchAsyncEventHandler BatchCompletion; /// /// Event that will be called when a message has been emitted /// public event BatchAsyncMessageHandler BatchMessageSent; /// /// Event to call when the batch has started execution /// public event BatchAsyncEventHandler BatchStart; /// /// Event that will be called when the resultset has completed execution. It will not be /// called from the Batch but from the ResultSet instance. /// public event ResultSet.ResultSetAsyncEventHandler ResultSetCompletion; /// /// Event that will be called when the resultSet first becomes available. This is as soon as we start reading the results. It will not be /// called from the Batch but from the ResultSet instance. /// public event ResultSet.ResultSetAsyncEventHandler ResultSetAvailable; /// /// Event that will be called when additional rows in the result set are available (rowCount available has increased). It will not be /// called from the Batch but from the ResultSet instance. /// public event ResultSet.ResultSetAsyncEventHandler ResultSetUpdated; #endregion #region Properties /// /// The text of batch that will be executed /// public string BatchText { get; set; } public int BatchExecutionCount { get; private set; } /// /// Localized timestamp for when the execution completed. /// Stored in UTC ISO 8601 format; should be localized before displaying to any user /// public string ExecutionEndTimeStamp => executionEndTime.ToString("o"); /// /// Localized timestamp for how long it took for the execution to complete /// public string ExecutionElapsedTime { get { TimeSpan elapsedTime = executionEndTime - executionStartTime; return elapsedTime.ToString(); } } /// /// Localized timestamp for when the execution began. /// Stored in UTC ISO 8601 format; should be localized before displaying to any user /// public string ExecutionStartTimeStamp => executionStartTime.ToString("o"); /// /// Whether or not this batch encountered an error that halted execution /// public bool HasError { get; set; } /// /// Whether or not this batch has been executed, regardless of success or failure /// public bool HasExecuted { get; set; } /// /// Ordinal of the batch in the query /// public int Id { get; } /// /// The result sets of the batch execution /// public IList ResultSets => resultSets; /// /// Property for generating a set result set summaries from the result sets /// public ResultSetSummary[] ResultSummaries { get { lock (resultSets) { return resultSets.Select(set => set.Summary).ToArray(); } } } /// /// Creates a based on the batch instance /// public BatchSummary Summary { get { // Batch summary with information available at start BatchSummary summary = new BatchSummary { Id = Id, Selection = Selection, ExecutionStart = ExecutionStartTimeStamp, HasError = HasError }; // Add on extra details if we finished executing it if (HasExecuted) { summary.ResultSetSummaries = ResultSummaries; summary.ExecutionEnd = ExecutionEndTimeStamp; summary.ExecutionElapsed = ExecutionElapsedTime; summary.SpecialAction = ProcessResultSetSpecialActions(); } return summary; } } /// /// The range from the file that is this batch /// internal SelectionData Selection { get; set; } #endregion #region Public Methods /// /// Executes this batch and captures any server messages that are returned. /// /// The connection to use to execute the batch /// Token for cancelling the execution public async Task Execute(ReliableDataSourceConnection conn, CancellationToken cancellationToken) { // Sanity check to make sure we haven't already run this batch if (HasExecuted) { throw new InvalidOperationException("Batch has already executed."); } // Notify that we've started execution if (BatchStart != null) { await BatchStart(this); } try { await DoExecute(conn, cancellationToken); } catch (TaskCanceledException) { // Cancellation isn't considered an error condition await SendMessage(SR.QueryServiceQueryCancelled, false); throw; } catch (Exception e) { HasError = true; await SendMessage(SR.QueryServiceQueryFailed(e.Message), true); throw; } finally { // Mark that we have executed HasExecuted = true; executionEndTime = DateTime.Now; // Fire an event to signify that the batch has completed if (BatchCompletion != null) { await BatchCompletion(this); } } } private async Task DoExecute(ReliableDataSourceConnection conn, CancellationToken cancellationToken) { bool canContinue = true; int timesLoop = this.BatchExecutionCount; await SendMessageIfExecutingMultipleTimes(SR.EE_ExecutionInfo_InitializingLoop, false); executionStartTime = DateTime.Now; while (canContinue && timesLoop > 0) { try { await ExecuteOnce(conn, cancellationToken); } catch (DbException dbe) { HasError = true; canContinue = await UnwrapDbException(dbe); if (canContinue) { // If it's a multi-batch, we notify the user that we're ignoring a single failure. await SendMessageIfExecutingMultipleTimes(SR.EE_BatchExecutionError_Ignoring, false); } } timesLoop--; } await SendMessageIfExecutingMultipleTimes(string.Format(CultureInfo.CurrentCulture, SR.EE_ExecutionInfo_FinalizingLoop, this.BatchExecutionCount), false); } private async Task SendMessageIfExecutingMultipleTimes(string message, bool isError) { if (IsExecutingMultipleTimes()) { await SendMessage(message, isError); } } private bool IsExecutingMultipleTimes() { return this.BatchExecutionCount > 1; } private async Task ExecuteOnce(ReliableDataSourceConnection conn, CancellationToken cancellationToken) { // Make sure we haven't cancelled yet cancellationToken.ThrowIfCancellationRequested(); ConnectionService.EnsureConnectionIsOpen(conn); // Execute the command to get back a reader using (IDataReader reader = await conn.GetUnderlyingConnection().ExecuteQueryAsync(BatchText, cancellationToken, conn.Database)) { do { // Verify that the cancellation token hasn't been canceled cancellationToken.ThrowIfCancellationRequested(); // This resultset has results (i.e. SELECT/etc queries) ResultSet resultSet = new ResultSet(resultSets.Count, Id, outputFileFactory); resultSet.ResultAvailable += ResultSetAvailable; resultSet.ResultUpdated += ResultSetUpdated; resultSet.ResultCompletion += ResultSetCompletion; // Add the result set to the results of the query lock (resultSets) { resultSets.Add(resultSet); } // Read until we hit the end of the result set await resultSet.ReadResultToEnd(reader, cancellationToken); } while (reader.NextResult()); // If there were no messages, for whatever reason (NO COUNT set, messages // were emitted, records returned), output a "successful" message if (!messagesSent) { await SendMessage(SR.QueryServiceCompletedSuccessfully, false); } } } /// /// Generates a subset of the rows from a result set of the batch /// /// The index for selecting the result set /// The starting row of the results /// How many rows to retrieve /// A subset of results public Task GetSubset(int resultSetIndex, long startRow, int rowCount) { ResultSet targetResultSet; lock (resultSets) { // Sanity check to make sure we have valid numbers if (resultSetIndex < 0 || resultSetIndex >= resultSets.Count) { throw new ArgumentOutOfRangeException(nameof(resultSetIndex), SR.QueryServiceSubsetResultSetOutOfRange); } targetResultSet = resultSets[resultSetIndex]; } // Retrieve the result set return targetResultSet.GetSubset(startRow, rowCount); } /// /// Generates an execution plan /// /// The index for selecting the result set /// An execution plan object public Task GetExecutionPlan(int resultSetIndex) { ResultSet targetResultSet; lock (resultSets) { // Sanity check to make sure we have valid numbers if (resultSetIndex < 0 || resultSetIndex >= resultSets.Count) { throw new ArgumentOutOfRangeException(nameof(resultSetIndex), SR.QueryServiceSubsetResultSetOutOfRange); } targetResultSet = resultSets[resultSetIndex]; } // Retrieve the result set return targetResultSet.GetExecutionPlan(); } /// /// Saves a result to a file format selected by the user /// /// Parameters for the save as request /// /// Factory for creating the reader/writer pair for outputing to the selected format /// /// Delegate to call when request successfully completes /// Delegate to call if the request fails public void SaveAs(SaveResultsRequestParams saveParams, IFileStreamFactory fileFactory, ResultSet.SaveAsAsyncEventHandler successHandler, ResultSet.SaveAsFailureAsyncEventHandler failureHandler) { // Get the result set to save ResultSet resultSet; lock (resultSets) { // Sanity check to make sure we have a valid result set if (saveParams.ResultSetIndex < 0 || saveParams.ResultSetIndex >= resultSets.Count) { throw new ArgumentOutOfRangeException(nameof(saveParams.BatchIndex), SR.QueryServiceSubsetResultSetOutOfRange); } resultSet = resultSets[saveParams.ResultSetIndex]; } resultSet.SaveAs(saveParams, fileFactory, successHandler, failureHandler); } #endregion #region Private Helpers private async Task SendMessage(string message, bool isError) { // If the message event is null, this is a no-op if (BatchMessageSent == null) { return; } // State that we've sent any message, and send it messagesSent = true; await BatchMessageSent(new ResultMessage(message, isError, Id)); } /// /// Handler for when the StatementCompleted event is fired for this batch's command. This /// will be executed ONLY when there is a rowcount to report. If this event is not fired /// either NOCOUNT has been set or the command doesn't affect records. /// /// Sender of the event /// Arguments for the event internal void StatementCompletedHandler(object sender, StatementCompletedEventArgs args) { // Add a message for the number of rows the query returned string message = args.RecordCount == 1 ? SR.QueryServiceAffectedOneRow : SR.QueryServiceAffectedRows(args.RecordCount); SendMessage(message, false).Wait(); } /// /// Delegate handler for storing messages that are returned from the server /// /// Object that fired the event /// Arguments from the event private async void ServerMessageHandler(object sender, SqlInfoMessageEventArgs args) { foreach (SqlError error in args.Errors) { await HandleSqlErrorMessage(error.Number, error.Class, error.State, error.LineNumber, error.Procedure, error.Message); } } /// /// Handle a single SqlError's error message by processing and displaying it. The arguments come from the error being handled /// internal async Task HandleSqlErrorMessage(int errorNumber, byte errorClass, byte state, int lineNumber, string procedure, string message) { // Did the database context change (error code 5701)? if (errorNumber == 5701) { return; } string detailedMessage; if (string.IsNullOrEmpty(procedure)) { detailedMessage = string.Format("Msg {0}, Level {1}, State {2}, Line {3}{4}{5}", errorNumber, errorClass, state, lineNumber + (Selection != null ? Selection.StartLine : 0), Environment.NewLine, message); } else { detailedMessage = string.Format("Msg {0}, Level {1}, State {2}, Procedure {3}, Line {4}{5}{6}", errorNumber, errorClass, state, procedure, lineNumber, Environment.NewLine, message); } bool isError; if (errorClass > 10) { isError = true; } else if (errorClass > 0 && errorNumber > 0) { isError = false; } else { isError = false; detailedMessage = null; } if (detailedMessage != null) { await SendMessage(detailedMessage, isError); } else { await SendMessage(message, isError); } if (isError) { this.HasError = true; } } /// /// Attempts to convert an to a that /// contains much more info about Sql Server errors. The exception is then unwrapped and /// messages are formatted and sent to the extension. If the exception cannot be /// converted to SqlException, the message is written to the messages list. /// /// The exception to unwrap /// true is exception can be ignored when in a loop, false otherwise private async Task UnwrapDbException(Exception dbe) { bool canIgnore = true; SqlException se = dbe as SqlException; if (se != null) { var errors = se.Errors.Cast().ToList(); // Detect user cancellation errors if (errors.Any(error => error.Class == 11 && error.Number == 0)) { // User cancellation error, add the single message await SendMessage(SR.QueryServiceQueryCancelled, false); canIgnore = false; } else { // Not a user cancellation error, add all foreach (var error in errors) { int lineNumber = error.LineNumber + (Selection != null ? Selection.StartLine : 0); string message = string.Format("Msg {0}, Level {1}, State {2}, Line {3}{4}{5}", error.Number, error.Class, error.State, lineNumber, Environment.NewLine, error.Message); await SendMessage(message, true); } } } else { await SendMessage(dbe.Message, true); } return canIgnore; } /// /// Aggregates all result sets in the batch into a single special action /// private SpecialAction ProcessResultSetSpecialActions() { foreach (ResultSet resultSet in resultSets) { specialAction.CombineSpecialAction(resultSet.Summary.SpecialAction); } return specialAction; } #endregion #region IDisposable Implementation public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (disposed) { return; } if (disposing) { lock (resultSets) { foreach (ResultSet r in resultSets) { r.Dispose(); } } } disposed = true; } #endregion } }