mirror of
https://github.com/ckaczor/sqltoolsservice.git
synced 2026-01-13 17:23:02 -05:00
This changes adds the following two notifications from the results processing within a batch. These new notifications allows a consumer to stream results from a resultset instead of getting them all at once after the entire resultset has been fetched. ResultsAvailable This is issued after at least 1 row has been fetched for this resultset. ResultsUpdated This is issued periodically as more rows are available on this resultset. The final send of this notification when all rows have been fetched has the property 'Complete' set to true in the ResultSummary object. Detailed Change Log: * Initial completed implementation of QueryResults stream feature. 3 unittests still need fixing * Fix for the 3 failing test. I will look into making MockBehavior strict again for the three tests later * Making GetReader/GetWriter use filestream objects in FileShare.ReadWrite mode so the file can be concurrently read and written * Changing resultsAvailable also to fire off on a timer instead of after 1st row * adding a project for clr TableValuedFunction to produce result set with delays after each row. This is helpful in end to end testing. * Fixing up some tests and simplifying implementation of result update timer * Address review comments * Some test fixes * Disabled flaky test verification
449 lines
19 KiB
C#
449 lines
19 KiB
C#
//
|
|
// Copyright (c) Microsoft. All rights reserved.
|
|
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
|
//
|
|
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.IO;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.SqlTools.Hosting.Channels;
|
|
using Microsoft.SqlTools.Hosting.Contracts;
|
|
using Microsoft.SqlTools.Hosting.Contracts.Internal;
|
|
using Microsoft.SqlTools.Hosting.Utility;
|
|
using Microsoft.SqlTools.Hosting.v2;
|
|
|
|
namespace Microsoft.SqlTools.Hosting.Protocol
|
|
{
|
|
public class JsonRpcHost : IJsonRpcHost
|
|
{
|
|
#region Private Fields
|
|
|
|
internal readonly CancellationTokenSource cancellationTokenSource;
|
|
|
|
private readonly CancellationToken consumeInputCancellationToken;
|
|
private readonly CancellationToken consumeOutputCancellationToken;
|
|
|
|
internal readonly BlockingCollection<Message> outputQueue;
|
|
internal readonly Dictionary<string, Func<Message, Task>> eventHandlers;
|
|
internal readonly Dictionary<string, Func<Message, Task>> requestHandlers;
|
|
internal readonly ConcurrentDictionary<string, TaskCompletionSource<Message>> pendingRequests;
|
|
internal readonly ChannelBase protocolChannel;
|
|
|
|
internal Task consumeInputTask;
|
|
internal Task consumeOutputTask;
|
|
private bool isStarted;
|
|
|
|
#endregion
|
|
|
|
public JsonRpcHost(ChannelBase channel)
|
|
{
|
|
Validate.IsNotNull(nameof(channel), channel);
|
|
|
|
cancellationTokenSource = new CancellationTokenSource();
|
|
consumeInputCancellationToken = cancellationTokenSource.Token;
|
|
consumeOutputCancellationToken = cancellationTokenSource.Token;
|
|
outputQueue = new BlockingCollection<Message>(new ConcurrentQueue<Message>());
|
|
protocolChannel = channel;
|
|
|
|
eventHandlers = new Dictionary<string, Func<Message, Task>>();
|
|
requestHandlers = new Dictionary<string, Func<Message, Task>>();
|
|
pendingRequests = new ConcurrentDictionary<string, TaskCompletionSource<Message>>();
|
|
}
|
|
|
|
#region Start/Stop Methods
|
|
|
|
/// <summary>
|
|
/// Starts the JSON RPC host using the protocol channel that was provided
|
|
/// </summary>
|
|
public void Start()
|
|
{
|
|
// If we've already started, we can't start up again
|
|
if (isStarted)
|
|
{
|
|
throw new InvalidOperationException(SR.HostingJsonRpcHostAlreadyStarted);
|
|
}
|
|
|
|
// Make sure no other calls try to start the endpoint during startup
|
|
isStarted = true;
|
|
|
|
// Initialize the protocol channel
|
|
protocolChannel.Start();
|
|
protocolChannel.WaitForConnection().Wait();
|
|
|
|
// Start the input and output consumption threads
|
|
consumeInputTask = ConsumeInput();
|
|
consumeOutputTask = ConsumeOutput();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Stops the JSON RPC host and the underlying protocol channel
|
|
/// </summary>
|
|
public void Stop()
|
|
{
|
|
// If we haven't started, we can't stop
|
|
if (!isStarted)
|
|
{
|
|
throw new InvalidOperationException(SR.HostingJsonRpcHostNotStarted);
|
|
}
|
|
|
|
// Make sure no future calls try to stop the endpoint during shutdown
|
|
isStarted = false;
|
|
|
|
// Shutdown the host
|
|
cancellationTokenSource.Cancel();
|
|
protocolChannel.Stop();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Waits for input and output threads to naturally exit
|
|
/// </summary>
|
|
/// <exception cref="InvalidOperationException">Thrown if the host has not started</exception>
|
|
public void WaitForExit()
|
|
{
|
|
// If we haven't started everything, we can't wait for exit
|
|
if (!isStarted)
|
|
{
|
|
throw new InvalidOperationException(SR.HostingJsonRpcHostNotStarted);
|
|
}
|
|
|
|
// Join the input and output threads to this thread
|
|
Task.WaitAll(consumeInputTask, consumeOutputTask);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Public Methods
|
|
|
|
/// <summary>
|
|
/// Sends an event, independent of any request
|
|
/// </summary>
|
|
/// <typeparam name="TParams">Event parameter type</typeparam>
|
|
/// <param name="eventType">Type of event being sent</param>
|
|
/// <param name="eventParams">Event parameters being sent</param>
|
|
/// <returns>Task that tracks completion of the send operation.</returns>
|
|
public void SendEvent<TParams>(
|
|
EventType<TParams> eventType,
|
|
TParams eventParams)
|
|
{
|
|
if (!protocolChannel.IsConnected)
|
|
{
|
|
throw new InvalidOperationException("SendEvent called when ProtocolChannel was not yet connected");
|
|
}
|
|
|
|
// Create a message from the event provided
|
|
Message message = Message.CreateEvent(eventType, eventParams);
|
|
outputQueue.Add(message);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Sends a request, independent of any request
|
|
/// </summary>
|
|
/// <param name="requestType">Configuration of the request that is being sent</param>
|
|
/// <param name="requestParams">Contents of the request</param>
|
|
/// <typeparam name="TParams">Type of the message contents</typeparam>
|
|
/// <typeparam name="TResult">Type of the contents of the expected result of the request</typeparam>
|
|
/// <returns>Task that is completed when the </returns>
|
|
/// TODO: This doesn't properly handle error responses scenarios.
|
|
public async Task<TResult> SendRequest<TParams, TResult>(
|
|
RequestType<TParams, TResult> requestType,
|
|
TParams requestParams)
|
|
{
|
|
if (!protocolChannel.IsConnected)
|
|
{
|
|
throw new InvalidOperationException("SendRequest called when ProtocolChannel was not yet connected");
|
|
}
|
|
|
|
// Add a task completion source for the request's response
|
|
string messageId = Guid.NewGuid().ToString();
|
|
TaskCompletionSource<Message> responseTask = new TaskCompletionSource<Message>();
|
|
pendingRequests.TryAdd(messageId, responseTask);
|
|
|
|
// Send the request
|
|
outputQueue.Add(Message.CreateRequest(requestType, messageId, requestParams));
|
|
|
|
// Wait for the response
|
|
Message responseMessage = await responseTask.Task;
|
|
|
|
return responseMessage.GetTypedContents<TResult>();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Sets the handler for an event with a given configuration
|
|
/// </summary>
|
|
/// <param name="eventType">Configuration of the event</param>
|
|
/// <param name="eventHandler">Function for handling the event</param>
|
|
/// <param name="overrideExisting">Whether or not to override any existing event handler for this method</param>
|
|
/// <typeparam name="TParams">Type of the parameters for the event</typeparam>
|
|
public void SetAsyncEventHandler<TParams>(
|
|
EventType<TParams> eventType,
|
|
Func<TParams, EventContext, Task> eventHandler,
|
|
bool overrideExisting = false)
|
|
{
|
|
Validate.IsNotNull(nameof(eventType), eventType);
|
|
Validate.IsNotNull(nameof(eventHandler), eventHandler);
|
|
|
|
if (overrideExisting)
|
|
{
|
|
// Remove the existing handler so a new one can be set
|
|
eventHandlers.Remove(eventType.MethodName);
|
|
}
|
|
|
|
Func<Message, Task> handler = eventMessage =>
|
|
eventHandler(eventMessage.GetTypedContents<TParams>(), new EventContext(outputQueue));
|
|
|
|
eventHandlers.Add(eventType.MethodName, handler);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a Func based that wraps the action in a task and calls the Func-based overload
|
|
/// </summary>
|
|
/// <param name="eventType">Configuration of the event</param>
|
|
/// <param name="eventHandler">Function for handling the event</param>
|
|
/// <param name="overrideExisting">Whether or not to override any existing event handler for this method</param>
|
|
/// <typeparam name="TParams">Type of the parameters for the event</typeparam>
|
|
public void SetEventHandler<TParams>(
|
|
EventType<TParams> eventType,
|
|
Action<TParams, EventContext> eventHandler,
|
|
bool overrideExisting = false)
|
|
{
|
|
Validate.IsNotNull(nameof(eventHandler), eventHandler);
|
|
Func<TParams, EventContext, Task> eventFunc = (p, e) => Task.Run(() => eventHandler(p, e));
|
|
SetAsyncEventHandler(eventType, eventFunc, overrideExisting);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Sets the handler for a request with a given configuration
|
|
/// </summary>
|
|
/// <param name="requestType">Configuration of the request</param>
|
|
/// <param name="requestHandler">Function for handling the request</param>
|
|
/// <param name="overrideExisting">Whether or not to override any existing request handler for this method</param>
|
|
/// <typeparam name="TParams">Type of the parameters for the request</typeparam>
|
|
/// <typeparam name="TResult">Type of the parameters for the response</typeparam>
|
|
public void SetAsyncRequestHandler<TParams, TResult>(
|
|
RequestType<TParams, TResult> requestType,
|
|
Func<TParams, RequestContext<TResult>, Task> requestHandler,
|
|
bool overrideExisting = false)
|
|
{
|
|
Validate.IsNotNull(nameof(requestType), requestType);
|
|
Validate.IsNotNull(nameof(requestHandler), requestHandler);
|
|
|
|
if (overrideExisting)
|
|
{
|
|
// Remove the existing handler so a new one can be set
|
|
requestHandlers.Remove(requestType.MethodName);
|
|
}
|
|
|
|
// Setup the wrapper around the handler
|
|
Func<Message, Task> handler = requestMessage =>
|
|
requestHandler(requestMessage.GetTypedContents<TParams>(), new RequestContext<TResult>(requestMessage, outputQueue));
|
|
|
|
requestHandlers.Add(requestType.MethodName, handler);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a Func based that wraps the action in a task and calls the Func-based overload
|
|
/// </summary>
|
|
/// /// <param name="requestType">Configuration of the request</param>
|
|
/// <param name="requestHandler">Function for handling the request</param>
|
|
/// <param name="overrideExisting">Whether or not to override any existing request handler for this method</param>
|
|
/// <typeparam name="TParams">Type of the parameters for the request</typeparam>
|
|
/// <typeparam name="TResult">Type of the parameters for the response</typeparam>
|
|
public void SetRequestHandler<TParams, TResult>(
|
|
RequestType<TParams, TResult> requestType,
|
|
Action<TParams, RequestContext<TResult>> requestHandler,
|
|
bool overrideExisting = false)
|
|
{
|
|
Validate.IsNotNull(nameof(requestHandler), requestHandler);
|
|
Func<TParams, RequestContext<TResult>, Task> requestFunc = (p, e) => Task.Run(() => requestHandler(p, e));
|
|
SetAsyncRequestHandler(requestType, requestFunc, overrideExisting);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Message Processing Tasks
|
|
|
|
internal Task ConsumeInput()
|
|
{
|
|
return Task.Factory.StartNew(async () =>
|
|
{
|
|
while (!consumeInputCancellationToken.IsCancellationRequested)
|
|
{
|
|
Message incomingMessage;
|
|
try
|
|
{
|
|
// Read message from the input channel
|
|
incomingMessage = await protocolChannel.MessageReader.ReadMessage();
|
|
}
|
|
catch (EndOfStreamException)
|
|
{
|
|
// The stream has ended, end the input message loop
|
|
break;
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
// Log the error and send an error event to the client
|
|
string message = $"Exception occurred while receiving input message: {e.Message}";
|
|
Logger.Write(TraceEventType.Error, message);
|
|
|
|
// TODO: Add event to output queue, and unit test it
|
|
|
|
// Continue the loop
|
|
continue;
|
|
}
|
|
|
|
// Verbose logging
|
|
string logMessage =
|
|
$"Received message with Id[{incomingMessage.Id}] of type[{incomingMessage.MessageType}] and method[{incomingMessage.Method}]";
|
|
Logger.Write(TraceEventType.Verbose, logMessage);
|
|
|
|
// Process the message
|
|
try
|
|
{
|
|
await DispatchMessage(incomingMessage);
|
|
}
|
|
catch (MethodHandlerDoesNotExistException)
|
|
{
|
|
// Method could not be handled, if the message was a request, send an error back to the client
|
|
// TODO: Localize
|
|
string mnfLogMessage =
|
|
$"Failed to find method handler for type[{incomingMessage.MessageType}] and method[{incomingMessage.Method}]";
|
|
Logger.Write(TraceEventType.Warning, mnfLogMessage);
|
|
|
|
if (incomingMessage.MessageType == MessageType.Request)
|
|
{
|
|
// TODO: Localize
|
|
Error mnfError = new Error {Code = -32601, Message = "Method not found"};
|
|
Message errorMessage = Message.CreateResponseError(incomingMessage.Id, mnfError);
|
|
outputQueue.Add(errorMessage, consumeInputCancellationToken);
|
|
}
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
// General errors should be logged but not halt the processing loop
|
|
string geLogMessage =
|
|
$"Exception thrown when handling message of type[{incomingMessage.MessageType}] and method[{incomingMessage.Method}]: {e}";
|
|
Logger.Write(TraceEventType.Error, geLogMessage);
|
|
// TODO: Should we be returning a response for failing requests?
|
|
}
|
|
}
|
|
|
|
Logger.Write(TraceEventType.Warning, "Exiting consume input loop!");
|
|
}, consumeOutputCancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
|
|
}
|
|
|
|
internal Task ConsumeOutput()
|
|
{
|
|
return Task.Factory.StartNew(async () =>
|
|
{
|
|
while (!consumeOutputCancellationToken.IsCancellationRequested)
|
|
{
|
|
Message outgoingMessage;
|
|
try
|
|
{
|
|
// Read message from the output queue
|
|
outgoingMessage = outputQueue.Take(consumeOutputCancellationToken);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// Cancelled during taking, end the loop
|
|
break;
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
// If we hit an exception here, it is unrecoverable
|
|
string message = string.Format("Unexpected occurred while receiving output message: {0}", e.Message);
|
|
Logger.Write(TraceEventType.Error, message);
|
|
|
|
break;
|
|
}
|
|
|
|
// Send the message
|
|
string logMessage = string.Format("Sending message of type[{0}] and method[{1}]",
|
|
outgoingMessage.MessageType, outgoingMessage.Method);
|
|
Logger.Write(TraceEventType.Verbose, logMessage);
|
|
|
|
await protocolChannel.MessageWriter.WriteMessage(outgoingMessage);
|
|
}
|
|
Logger.Write(TraceEventType.Warning, "Exiting consume output loop!");
|
|
}, consumeOutputCancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
|
|
}
|
|
|
|
internal async Task DispatchMessage(Message messageToDispatch)
|
|
{
|
|
Task handlerToAwait = null;
|
|
|
|
switch (messageToDispatch.MessageType)
|
|
{
|
|
case MessageType.Request:
|
|
Func<Message, Task> requestHandler;
|
|
if (requestHandlers.TryGetValue(messageToDispatch.Method, out requestHandler))
|
|
{
|
|
handlerToAwait = requestHandler(messageToDispatch);
|
|
}
|
|
else
|
|
{
|
|
throw new MethodHandlerDoesNotExistException(MessageType.Request, messageToDispatch.Method);
|
|
}
|
|
break;
|
|
case MessageType.Response:
|
|
TaskCompletionSource<Message> requestTask;
|
|
if (pendingRequests.TryRemove(messageToDispatch.Id, out requestTask))
|
|
{
|
|
requestTask.SetResult(messageToDispatch);
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
throw new MethodHandlerDoesNotExistException(MessageType.Response, "response");
|
|
}
|
|
case MessageType.Event:
|
|
Func<Message, Task> eventHandler;
|
|
if (eventHandlers.TryGetValue(messageToDispatch.Method, out eventHandler))
|
|
{
|
|
handlerToAwait = eventHandler(messageToDispatch);
|
|
}
|
|
else
|
|
{
|
|
throw new MethodHandlerDoesNotExistException(MessageType.Event, messageToDispatch.Method);
|
|
}
|
|
break;
|
|
default:
|
|
// TODO: This case isn't handled properly
|
|
break;
|
|
}
|
|
|
|
// Skip processing if there isn't anything to do
|
|
if (handlerToAwait == null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
// Run the handler
|
|
try
|
|
{
|
|
await handlerToAwait;
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
// Some tasks may be cancelled due to legitimate
|
|
// timeouts so don't let those exceptions go higher.
|
|
}
|
|
catch (AggregateException e)
|
|
{
|
|
if (!(e.InnerExceptions[0] is TaskCanceledException))
|
|
{
|
|
// Cancelled tasks aren't a problem, so rethrow
|
|
// anything that isn't a TaskCanceledException
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
|
|
#endregion
|
|
}
|
|
}
|