WIP update to support batch processing

This commit is contained in:
benrr101
2016-08-18 17:49:16 -07:00
parent dee490341d
commit 7202a7ed65
5 changed files with 264 additions and 156 deletions

View File

@@ -0,0 +1,195 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Data.SqlClient;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
{
public class Batch
{
private const string RowsAffectedFormat = "({0} row(s) affected)";
#region Properties
/// <summary>
/// The text of batch that will be executed
/// </summary>
public string BatchText { get; set; }
/// <summary>
/// Whether or not the query has an error
/// </summary>
public bool HasError { get; set; }
public bool HasExecuted { get; set; }
/// <summary>
/// Messages that have come back from the server
/// </summary>
public List<string> ResultMessages { get; set; }
/// <summary>
/// The result sets of the query execution
/// </summary>
public List<ResultSet> ResultSets { get; set; }
/// <summary>
/// Property for generating a set result set summaries from the result sets
/// </summary>
public ResultSetSummary[] ResultSummaries
{
get
{
return ResultSets.Select((set, index) => new ResultSetSummary()
{
ColumnInfo = set.Columns,
Id = index,
RowCount = set.Rows.Count
}).ToArray();
}
}
#endregion
public Batch(string batchText)
{
// Sanity check for input
if (string.IsNullOrEmpty(batchText))
{
throw new ArgumentNullException(nameof(batchText), "Query text cannot be null");
}
// Initialize the internal state
BatchText = batchText;
HasExecuted = false;
ResultSets = new List<ResultSet>();
ResultMessages = new List<string>();
}
public async Task Execute(DbConnection conn, CancellationToken cancellationToken)
{
try
{
// Register the message listener to *this instance* of the batch
// Note: This is being done to associate messages with batches
SqlConnection sqlConn = conn as SqlConnection;
if (sqlConn != null)
{
sqlConn.InfoMessage += StoreDbMessage;
}
// Create a command that we'll use for executing the query
using (DbCommand command = conn.CreateCommand())
{
command.CommandText = BatchText;
command.CommandType = CommandType.Text;
// Execute the command to get back a reader
using (DbDataReader reader = await command.ExecuteReaderAsync(cancellationToken))
{
do
{
// Skip this result set if there aren't any rows
if (!reader.HasRows && reader.FieldCount == 0)
{
// Create a message with the number of affected rows -- IF the query affects rows
ResultMessages.Add(reader.RecordsAffected >= 0
? string.Format(RowsAffectedFormat, reader.RecordsAffected)
: "Commad Executed Successfully");
continue;
}
// Read until we hit the end of the result set
ResultSet resultSet = new ResultSet();
while (await reader.ReadAsync(cancellationToken))
{
resultSet.AddRow(reader);
}
// Read off the column schema information
if (reader.CanGetColumnSchema())
{
resultSet.Columns = reader.GetColumnSchema().ToArray();
}
// Add the result set to the results of the query
ResultSets.Add(resultSet);
} while (await reader.NextResultAsync(cancellationToken));
}
}
}
catch (DbException dbe)
{
HasError = true;
UnwrapDbException(dbe);
}
catch (Exception)
{
HasError = true;
throw;
}
finally
{
// Remove the message event handler from the connection
SqlConnection sqlConn = conn as SqlConnection;
if (sqlConn != null)
{
sqlConn.InfoMessage -= StoreDbMessage;
}
// Mark that we have executed
HasExecuted = true;
}
}
#region Private Helpers
/// <summary>
/// Delegate handler for storing messages that are returned from the server
/// NOTE: Only messages that are below a certain severity will be returned via this
/// mechanism. Anything above that level will trigger an exception.
/// </summary>
/// <param name="sender">Object that fired the event</param>
/// <param name="args">Arguments from the event</param>
private void StoreDbMessage(object sender, SqlInfoMessageEventArgs args)
{
ResultMessages.Add(args.Message);
}
/// <summary>
/// Attempts to convert a <see cref="DbException"/> to a <see cref="SqlException"/> that
/// contains much more info about Sql Server errors. The exception is then unwrapped and
/// messages are formatted and stored in <see cref="ResultMessages"/>. If the exception
/// cannot be converted to SqlException, the message is written to the messages list.
/// </summary>
/// <param name="dbe">The exception to unwrap</param>
private void UnwrapDbException(DbException dbe)
{
SqlException se = dbe as SqlException;
if (se != null)
{
foreach (var error in se.Errors)
{
SqlError sqlError = error as SqlError;
if (sqlError != null)
{
string message = String.Format("Msg {0}, Level {1}, State {2}, Line {3}{4}{5}",
sqlError.Number, sqlError.Class, sqlError.State, sqlError.LineNumber,
Environment.NewLine, sqlError.Message);
ResultMessages.Add(message);
}
}
}
else
{
ResultMessages.Add(dbe.Message);
}
}
#endregion
}
}

View File

@@ -0,0 +1,33 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
{
/// <summary>
/// Summary of a batch within a query
/// </summary>
public class BatchSummary
{
/// <summary>
/// Whether or not the batch was successful. True indicates errors, false indicates success
/// </summary>
public bool HasError { get; set; }
/// <summary>
/// The ID of the result set within the query results
/// </summary>
public int Id { get; set; }
/// <summary>
/// Any messages that came back from the server during execution of the batch
/// </summary>
public string[] Messages { get; set; }
/// <summary>
/// The summaries of the result sets inside the batch
/// </summary>
public ResultSetSummary[] ResultSetSummaries { get; set; }
}
}

View File

@@ -22,15 +22,10 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
/// </summary>
public string[] Messages { get; set; }
/// <summary>
/// Whether or not the query was successful. True indicates errors, false indicates success
/// </summary>
public bool HasError { get; set; }
/// <summary>
/// Summaries of the result sets that were returned with the query
/// </summary>
public ResultSetSummary[] ResultSetSummaries { get; set; }
public BatchSummary[] BatchSummaries { get; set; }
}
public class QueryExecuteCompleteEvent

View File

@@ -13,7 +13,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
public class ResultSetSummary
{
/// <summary>
/// The ID of the result set within the query results
/// The ID of the result set within the batch results
/// </summary>
public int Id { get; set; }

View File

@@ -5,12 +5,11 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Data.SqlClient;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Management.SqlParser.Parser;
using Microsoft.SqlTools.ServiceLayer.Connection;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
@@ -23,6 +22,25 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
{
#region Properties
/// <summary>
/// The batches underneath this query
/// </summary>
private IEnumerable<Batch> Batches { get; set; }
/// <summary>
/// The summaries of the batches underneath this query
/// </summary>
public BatchSummary[] BatchSummaries
{
get { return Batches.Select((batch, index) => new BatchSummary
{
Id = index,
HasError = batch.HasError,
Messages = batch.ResultMessages.ToArray(),
ResultSetSummaries = batch.ResultSummaries
}).ToArray(); }
}
/// <summary>
/// Cancellation token source, used for cancelling async db actions
/// </summary>
@@ -34,47 +52,19 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// </summary>
public ConnectionInfo EditorConnection { get; set; }
/// <summary>
/// Whether or not the query has an error
/// </summary>
public bool HasError { get; set; }
/// <summary>
/// Whether or not the query has completed executed, regardless of success or failure
/// </summary>
public bool HasExecuted { get; set; }
public bool HasExecuted
{
get { return Batches.All(b => b.HasExecuted); }
}
/// <summary>
/// The text of the query to execute
/// </summary>
public string QueryText { get; set; }
/// <summary>
/// Messages that have come back from the server
/// </summary>
public List<string> ResultMessages { get; set; }
/// <summary>
/// The result sets of the query execution
/// </summary>
public List<ResultSet> ResultSets { get; set; }
/// <summary>
/// Property for generating a set result set summaries from the result sets
/// </summary>
public ResultSetSummary[] ResultSummary
{
get
{
return ResultSets.Select((set, index) => new ResultSetSummary
{
ColumnInfo = set.Columns,
Id = index,
RowCount = set.Rows.Count
}).ToArray();
}
}
#endregion
/// <summary>
@@ -97,10 +87,11 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
// Initialize the internal state
QueryText = queryText;
EditorConnection = connection;
HasExecuted = false;
ResultSets = new List<ResultSet>();
ResultMessages = new List<string>();
cancellationSource = new CancellationTokenSource();
// Process the query into batches
ParseResult parseResult = Parser.Parse(queryText);
Batches = parseResult.Script.Batches.Select(b => new Batch(b.Sql));
}
/// <summary>
@@ -114,80 +105,16 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
throw new InvalidOperationException("Query has already executed.");
}
DbConnection conn = null;
// Create a connection from the connection details
try
// Open up a connection for querying the database
string connectionString = ConnectionService.BuildConnectionString(EditorConnection.ConnectionDetails);
using (DbConnection conn = EditorConnection.Factory.CreateSqlConnection(connectionString))
{
string connectionString = ConnectionService.BuildConnectionString(EditorConnection.ConnectionDetails);
using (conn = EditorConnection.Factory.CreateSqlConnection(connectionString))
// We need these to execute synchronously, otherwise the user will be very unhappy
foreach (Batch b in Batches)
{
// If we have the message listener, bind to it
SqlConnection sqlConn = conn as SqlConnection;
if (sqlConn != null)
{
sqlConn.InfoMessage += StoreDbMessage;
}
await conn.OpenAsync(cancellationSource.Token);
// Create a command that we'll use for executing the query
using (DbCommand command = conn.CreateCommand())
{
command.CommandText = QueryText;
command.CommandType = CommandType.Text;
// Execute the command to get back a reader
using (DbDataReader reader = await command.ExecuteReaderAsync(cancellationSource.Token))
{
do
{
// Create a message with the number of affected rows
if (reader.RecordsAffected >= 0)
{
ResultMessages.Add(String.Format("({0} row(s) affected)", reader.RecordsAffected));
}
if (!reader.HasRows && reader.FieldCount == 0)
{
continue;
}
// Read until we hit the end of the result set
ResultSet resultSet = new ResultSet();
while (await reader.ReadAsync(cancellationSource.Token))
{
resultSet.AddRow(reader);
}
// Read off the column schema information
if (reader.CanGetColumnSchema())
{
resultSet.Columns = reader.GetColumnSchema().ToArray();
}
// Add the result set to the results of the query
ResultSets.Add(resultSet);
} while (await reader.NextResultAsync(cancellationSource.Token));
}
}
await b.Execute(conn, cancellationSource.Token);
}
}
catch (DbException dbe)
{
HasError = true;
UnwrapDbException(dbe);
}
catch (Exception)
{
HasError = true;
throw;
}
finally
{
// Mark that we have executed
HasExecuted = true;
}
}
/// <summary>
@@ -246,48 +173,6 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
cancellationSource.Cancel();
}
/// <summary>
/// Delegate handler for storing messages that are returned from the server
/// NOTE: Only messages that are below a certain severity will be returned via this
/// mechanism. Anything above that level will trigger an exception.
/// </summary>
/// <param name="sender">Object that fired the event</param>
/// <param name="args">Arguments from the event</param>
private void StoreDbMessage(object sender, SqlInfoMessageEventArgs args)
{
ResultMessages.Add(args.Message);
}
/// <summary>
/// Attempts to convert a <see cref="DbException"/> to a <see cref="SqlException"/> that
/// contains much more info about Sql Server errors. The exception is then unwrapped and
/// messages are formatted and stored in <see cref="ResultMessages"/>. If the exception
/// cannot be converted to SqlException, the message is written to the messages list.
/// </summary>
/// <param name="dbe">The exception to unwrap</param>
private void UnwrapDbException(DbException dbe)
{
SqlException se = dbe as SqlException;
if (se != null)
{
foreach (var error in se.Errors)
{
SqlError sqlError = error as SqlError;
if (sqlError != null)
{
string message = String.Format("Msg {0}, Level {1}, State {2}, Line {3}{4}{5}",
sqlError.Number, sqlError.Class, sqlError.State, sqlError.LineNumber,
Environment.NewLine, sqlError.Message);
ResultMessages.Add(message);
}
}
}
else
{
ResultMessages.Add(dbe.Message);
}
}
#region IDisposable Implementation
private bool disposed;