WIP for QueryExecution, mostly complete

This commit is contained in:
Benjamin Russell
2016-08-04 14:48:58 -07:00
parent 1618b77790
commit 3ba22c94ac
12 changed files with 338 additions and 191 deletions

View File

@@ -6,14 +6,14 @@
using System;
using Microsoft.SqlTools.ServiceLayer.Hosting.Protocol.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
{
/// <summary>
/// Parameters for the query dispose request
/// </summary>
public class QueryDisposeParams
{
public Guid QueryId { get; set; }
public string OwnerUri { get; set; }
}
/// <summary>

View File

@@ -1,12 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.Hosting.Protocol.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
{
public class QueryExecuteCompleteNotification
public class QueryExecuteCompleteParams
{
/// <summary>
/// URI for the editor that owns the query
/// </summary>
public string OwnerUri { get; set; }
/// <summary>
/// Any messages that came back from the server during execution of the query
/// </summary>
@@ -22,4 +24,11 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
/// </summary>
public ResultSetSummary[] ResultSetSummaries { get; set; }
}
public class QueryExecuteCompleteEvent
{
public static readonly
EventType<QueryExecuteCompleteParams> Type =
EventType<QueryExecuteCompleteParams>.Create("query/complete");
}
}

View File

@@ -6,7 +6,7 @@
using System;
using Microsoft.SqlTools.ServiceLayer.Hosting.Protocol.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
{
/// <summary>
/// Parameters for the query execute request

View File

@@ -6,7 +6,7 @@
using System;
using Microsoft.SqlTools.ServiceLayer.Hosting.Protocol.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
{
/// <summary>
/// Parameters for a query result subset retrieval request
@@ -16,7 +16,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
/// <summary>
/// ID of the query to look up the results for
/// </summary>
public Guid QueryId { get; set; }
public string OwnerId { get; set; }
/// <summary>
/// Index of the result set to get the results from
@@ -38,11 +38,12 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
}
/// <summary>
///
/// Parameters for the result of a subset retrieval request
/// </summary>
public class QueryExecuteSubsetResult
{
public string Message { get; set; }
public ResultSetSubset ResultSubset { get; set; }
}
public class QueryExecuteSubsetRequest

View File

@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
{
public class ResultSetSubset
{
public int RowCount { get; set; }
public object[][] Rows { get; set; }
}
}

View File

@@ -1,10 +1,6 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Threading.Tasks;
using System.Data.Common;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
{
public class ResultSetSummary
{
@@ -16,11 +12,11 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
/// <summary>
/// The number of rows that was returned with the resultset
/// </summary>
public long RowCount { get; set; }
public int RowCount { get; set; }
/// <summary>
/// Details about the columns that are provided as solutions
/// </summary>
public DbColumn ColumnInfo { get; set; }
public DbColumn[] ColumnInfo { get; set; }
}
}

View File

@@ -0,0 +1,144 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.Connection;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
{
public class Query //: IDisposable
{
#region Properties
public string QueryText { get; set; }
public ConnectionInfo EditorConnection { get; set; }
private readonly CancellationTokenSource cancellationSource;
public List<ResultSet> ResultSets { get; set; }
public ResultSetSummary[] ResultSummary
{
get
{
return ResultSets.Select((set, index) => new ResultSetSummary
{
ColumnInfo = set.Columns,
Id = index,
RowCount = set.Rows.Count
}).ToArray();
}
}
public bool HasExecuted { get; set; }
#endregion
public Query(string queryText, ConnectionInfo connection)
{
// Sanity check for input
if (queryText == null)
{
throw new ArgumentNullException(nameof(queryText), "Query text cannot be null");
}
if (connection == null)
{
throw new ArgumentNullException(nameof(connection), "Connection cannot be null");
}
// Initialize the internal state
QueryText = queryText;
EditorConnection = connection;
HasExecuted = false;
ResultSets = new List<ResultSet>();
cancellationSource = new CancellationTokenSource();
}
public async Task Execute()
{
// Sanity check to make sure we haven't already run this query
if (HasExecuted)
{
throw new InvalidOperationException("Query has already executed.");
}
// Create a connection from the connection details
using (DbConnection conn = EditorConnection.Factory.CreateSqlConnection(EditorConnection.ConnectionDetails))
{
await conn.OpenAsync(cancellationSource.Token);
// Create a command that we'll use for executing the query
using (DbCommand command = conn.CreateCommand())
{
command.CommandText = QueryText;
command.CommandType = CommandType.Text;
// Execute the command to get back a reader
using (DbDataReader reader = await command.ExecuteReaderAsync(cancellationSource.Token))
{
do
{
// Create a new result set that we'll use to store all the data
ResultSet resultSet = new ResultSet();
if (reader.CanGetColumnSchema())
{
resultSet.Columns = reader.GetColumnSchema().ToArray();
}
// Read until we hit the end of the result set
while (await reader.ReadAsync(cancellationSource.Token))
{
resultSet.AddRow(reader);
}
// Add the result set to the results of the query
ResultSets.Add(resultSet);
} while (await reader.NextResultAsync(cancellationSource.Token));
}
}
}
// Mark that we have executed
HasExecuted = true;
}
public ResultSetSubset GetSubset(int resultSetIndex, int startRow, int rowCount)
{
// Sanity check that the results are available
if (!HasExecuted)
{
throw new InvalidOperationException("The query has not completed, yet.");
}
// Sanity check to make sure we have valid numbers
if (resultSetIndex < 0 || resultSetIndex >= ResultSets.Count)
{
throw new ArgumentOutOfRangeException(nameof(resultSetIndex), "Result set index cannot be less than 0" +
"or greater than the number of result sets");
}
ResultSet targetResultSet = ResultSets[resultSetIndex];
if (startRow < 0 || startRow >= targetResultSet.Rows.Count)
{
throw new ArgumentOutOfRangeException(nameof(startRow), "Start row cannot be less than 0 " +
"or greater than the number of rows in the resultset");
}
if (rowCount <= 0)
{
throw new ArgumentOutOfRangeException(nameof(rowCount), "Row count must be a positive integer");
}
// Retrieve the subset of the results as per the request
object[][] rows = targetResultSet.Rows.Skip(startRow).Take(rowCount).ToArray();
return new ResultSetSubset
{
Rows = rows,
RowCount = rows.Length
};
}
}
}

View File

@@ -0,0 +1,153 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.Connection;
using Microsoft.SqlTools.ServiceLayer.Hosting;
using Microsoft.SqlTools.ServiceLayer.Hosting.Protocol;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
{
public sealed class QueryExecutionService
{
#region Singleton Instance Implementation
private static readonly Lazy<QueryExecutionService> instance = new Lazy<QueryExecutionService>(() => new QueryExecutionService());
public static QueryExecutionService Instance
{
get { return instance.Value; }
}
private QueryExecutionService() { }
#endregion
#region Properties
private readonly Lazy<ConcurrentDictionary<string, Query>> queries =
new Lazy<ConcurrentDictionary<string, Query>>(() => new ConcurrentDictionary<string, Query>());
private ConcurrentDictionary<string, Query> ActiveQueries
{
get { return queries.Value; }
}
#endregion
#region Public Methods
/// <summary>
///
/// </summary>
/// <param name="serviceHost"></param>
public void InitializeService(ServiceHost serviceHost)
{
// Register handlers for requests
serviceHost.SetRequestHandler(QueryExecuteRequest.Type, HandleExecuteRequest);
serviceHost.SetRequestHandler(QueryExecuteSubsetRequest.Type, HandleResultSubsetRequest);
serviceHost.SetRequestHandler(QueryDisposeRequest.Type, HandleDisposeRequest);
// Register handlers for events
}
#endregion
#region Request Handlers
private async Task HandleExecuteRequest(QueryExecuteParams executeParams,
RequestContext<QueryExecuteResult> requestContext)
{
// Attempt to get the connection for the editor
ConnectionInfo connectionInfo;
if(!ConnectionService.Instance.TryFindConnection(executeParams.OwnerUri, out connectionInfo))
{
await requestContext.SendError("This editor is not connected to a database.");
return;
}
// If there is already an in-flight query, error out
Query newQuery = new Query(executeParams.QueryText, connectionInfo);
if (!ActiveQueries.TryAdd(executeParams.OwnerUri, newQuery))
{
await requestContext.SendError("A query is already in progress for this editor session." +
"Please cancel this query or wait for its completion.");
return;
}
// Launch the query and respond with successfully launching it
Task executeTask = newQuery.Execute();
await requestContext.SendResult(new QueryExecuteResult
{
Messages = null
});
// Wait for query execution and then send back the results
await Task.WhenAll(executeTask);
QueryExecuteCompleteParams eventParams = new QueryExecuteCompleteParams
{
Error = false,
Messages = new string[]{}, // TODO: Figure out how to get messages back from the server
OwnerUri = executeParams.OwnerUri,
ResultSetSummaries = newQuery.ResultSummary
};
await requestContext.SendEvent(QueryExecuteCompleteEvent.Type, eventParams);
}
private async Task HandleResultSubsetRequest(QueryExecuteSubsetParams subsetParams,
RequestContext<QueryExecuteSubsetResult> requestContext)
{
// Attempt to load the query
Query query;
if (!ActiveQueries.TryGetValue(subsetParams.OwnerId, out query))
{
var errorResult = new QueryExecuteSubsetResult
{
Message = "The requested query does not exist."
};
await requestContext.SendResult(errorResult);
return;
}
try
{
// Retrieve the requested subset and return it
var result = new QueryExecuteSubsetResult
{
Message = null,
ResultSubset = query.GetSubset(
subsetParams.ResultSetIndex, subsetParams.RowsStartIndex, subsetParams.RowsCount)
};
await requestContext.SendResult(result);
}
catch (Exception e)
{
await requestContext.SendResult(new QueryExecuteSubsetResult
{
Message = e.Message
});
}
}
private async Task HandleDisposeRequest(QueryDisposeParams disposeParams,
RequestContext<QueryDisposeResult> requestContext)
{
// Attempt to remove the query for the owner uri
Query result;
if (!ActiveQueries.TryRemove(disposeParams.OwnerUri, out result))
{
await requestContext.SendError("Failed to dispose query, ID not found.");
return;
}
// Success
await requestContext.SendResult(new QueryDisposeResult
{
Messages = null
});
}
#endregion
}
}

View File

@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Data.Common;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts
{
public class ResultSet
{

View File

@@ -1,11 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts
{
public class ResultSetSubset
{
}
}

View File

@@ -1,69 +0,0 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices
{
public class Query //: IDisposable
{
public string QueryText { get; set; }
public DbConnection SqlConnection { get; set; }
private readonly CancellationTokenSource cancellationSource;
public List<ResultSet> ResultSets { get; set; }
public Query(string queryText, DbConnection connection)
{
QueryText = queryText;
SqlConnection = connection;
ResultSets = new List<ResultSet>();
cancellationSource = new CancellationTokenSource();
}
public async Task Execute()
{
// Open the connection, if it's not already open
if ((SqlConnection.State & ConnectionState.Open) == 0)
{
await SqlConnection.OpenAsync(cancellationSource.Token);
}
// Create a command that we'll use for executing the query
using (DbCommand command = SqlConnection.CreateCommand())
{
command.CommandText = QueryText;
command.CommandType = CommandType.Text;
// Execute the command to get back a reader
using (DbDataReader reader = await command.ExecuteReaderAsync(cancellationSource.Token))
{
do
{
// Create a new result set that we'll use to store all the data
ResultSet resultSet = new ResultSet();
if (reader.CanGetColumnSchema())
{
resultSet.Columns = reader.GetColumnSchema().ToArray();
}
// Read until we hit the end of the result set
while (await reader.ReadAsync(cancellationSource.Token))
{
resultSet.AddRow(reader);
}
// Add the result set to the results of the query
ResultSets.Add(resultSet);
} while (await reader.NextResultAsync(cancellationSource.Token));
}
}
}
}
}

View File

@@ -1,89 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.Hosting;
using Microsoft.SqlTools.ServiceLayer.Hosting.Protocol;
using Microsoft.SqlTools.ServiceLayer.QueryExecutionServices.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecutionServices
{
public sealed class QueryExecutionService
{
#region Singleton Instance Implementation
private static readonly Lazy<QueryExecutionService> instance = new Lazy<QueryExecutionService>(() => new QueryExecutionService());
public static QueryExecutionService Instance
{
get { return instance.Value; }
}
private QueryExecutionService() { }
#endregion
#region Properties
private readonly Lazy<ConcurrentDictionary<string, Query>> queries =
new Lazy<ConcurrentDictionary<string, Query>>(() => new ConcurrentDictionary<string, Query>());
private ConcurrentDictionary<string, Query> Queries
{
get { return queries.Value; }
}
#endregion
#region Public Methods
/// <summary>
///
/// </summary>
/// <param name="serviceHost"></param>
public void InitializeService(ServiceHost serviceHost)
{
// Register handlers for requests
serviceHost.SetRequestHandler(QueryExecuteRequest.Type, HandleExecuteRequest);
serviceHost.SetRequestHandler(QueryExecuteSubsetRequest.Type, HandleResultSubsetRequest);
serviceHost.SetRequestHandler(QueryDisposeRequest.Type, HandleDisposeRequest);
// Register handlers for events
}
#endregion
#region Request Handlers
private async Task HandleExecuteRequest(QueryExecuteParams executeParams,
RequestContext<QueryExecuteResult> requestContext)
{
}
private async Task HandleResultSubsetRequest(QueryExecuteSubsetParams subsetParams,
RequestContext<QueryExecuteSubsetResult> requestContext)
{
await Task.FromResult(0);
}
private async Task HandleDisposeRequest(QueryDisposeParams disposeParams,
RequestContext<QueryDisposeResult> requestContext)
{
string messages = null;
Query result;
if (!Queries.TryRemove(disposeParams., out result))
{
messages = "Failed to dispose query, ID not found.";
}
await requestContext.SendResult(new QueryDisposeResult
{
Messages = messages
});
}
#endregion
}
}