Feature/connect cancel (#74)

* Implemented connection cancellation

* Made connect requests return immediately and created a separate connection complete notification

* Fix spelling

* Fix sorting

* Add separate lock for cancellation source map
This commit is contained in:
Mitchell Sternke
2016-10-04 15:45:52 -07:00
committed by GitHub
parent 62525b9c98
commit 8408bc6dff
17 changed files with 532 additions and 98 deletions

View File

@@ -4,10 +4,12 @@
//
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.Connection.Contracts;
using Microsoft.SqlTools.ServiceLayer.Connection.ReliableConnection;
@@ -47,6 +49,10 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
private Dictionary<string, ConnectionInfo> ownerToConnectionMap = new Dictionary<string, ConnectionInfo>();
private ConcurrentDictionary<string, CancellationTokenSource> ownerToCancellationTokenSourceMap = new ConcurrentDictionary<string, CancellationTokenSource>();
private Object cancellationTokenSourceLock = new Object();
/// <summary>
/// Map from script URIs to ConnectionInfo objects
/// This is internal for testing access only
@@ -131,21 +137,22 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
/// Open a connection with the specified connection details
/// </summary>
/// <param name="connectionParams"></param>
public ConnectResponse Connect(ConnectParams connectionParams)
public async Task<ConnectionCompleteParams> Connect(ConnectParams connectionParams)
{
// Validate parameters
string paramValidationErrorMessage;
if (connectionParams == null)
{
return new ConnectResponse
return new ConnectionCompleteParams
{
Messages = SR.ConnectionServiceConnectErrorNullParams
};
}
if (!connectionParams.IsValid(out paramValidationErrorMessage))
{
return new ConnectResponse
return new ConnectionCompleteParams
{
OwnerUri = connectionParams.OwnerUri,
Messages = paramValidationErrorMessage
};
}
@@ -164,7 +171,9 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
connectionInfo = new ConnectionInfo(ConnectionFactory, connectionParams.OwnerUri, connectionParams.Connection);
// try to connect
var response = new ConnectResponse();
var response = new ConnectionCompleteParams();
response.OwnerUri = connectionParams.OwnerUri;
CancellationTokenSource source = null;
try
{
// build the connection string from the input parameters
@@ -177,7 +186,36 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
// we'll remove this once ConnectionService is refactored to not own the LanguageService connection
connectionInfo.ConnectionDetails.MultipleActiveResultSets = true;
connectionInfo.SqlConnection.Open();
// Add a cancellation token source so that the connection OpenAsync() can be cancelled
using (source = new CancellationTokenSource())
{
// Locking here to perform two operations as one atomic operation
lock (cancellationTokenSourceLock)
{
// If the URI is currently connecting from a different request, cancel it before we try to connect
CancellationTokenSource currentSource;
if (ownerToCancellationTokenSourceMap.TryGetValue(connectionParams.OwnerUri, out currentSource))
{
currentSource.Cancel();
}
ownerToCancellationTokenSourceMap[connectionParams.OwnerUri] = source;
}
// Create a task to handle cancellation requests
var cancellationTask = Task.Run(() =>
{
source.Token.WaitHandle.WaitOne();
source.Token.ThrowIfCancellationRequested();
});
var openTask = Task.Run(async () => {
await connectionInfo.SqlConnection.OpenAsync(source.Token);
});
// Open the connection
await Task.WhenAny(openTask, cancellationTask).Unwrap();
source.Cancel();
}
}
catch (SqlException ex)
{
@@ -186,12 +224,32 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
response.Messages = ex.ToString();
return response;
}
catch (OperationCanceledException)
{
// OpenAsync was cancelled
response.Messages = SR.ConnectionServiceConnectionCanceled;
return response;
}
catch (Exception ex)
{
response.ErrorMessage = ex.Message;
response.Messages = ex.ToString();
return response;
}
finally
{
// Remove our cancellation token from the map since we're no longer connecting
// Using a lock here to perform two operations as one atomic operation
lock (cancellationTokenSourceLock)
{
// Only remove the token from the map if it is the same one created by this request
CancellationTokenSource sourceValue;
if (ownerToCancellationTokenSourceMap.TryGetValue(connectionParams.OwnerUri, out sourceValue) && sourceValue == source)
{
ownerToCancellationTokenSourceMap.TryRemove(connectionParams.OwnerUri, out sourceValue);
}
}
}
ownerToConnectionMap[connectionParams.OwnerUri] = connectionInfo;
@@ -208,7 +266,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
// invoke callback notifications
foreach (var activity in this.onConnectionActivities)
{
activity(connectionInfo);
await activity(connectionInfo);
}
// try to get information about the connected SQL Server instance
@@ -242,6 +300,37 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
return response;
}
/// <summary>
/// Cancel a connection that is in the process of opening.
/// </summary>
public bool CancelConnect(CancelConnectParams cancelParams)
{
// Validate parameters
if (cancelParams == null || string.IsNullOrEmpty(cancelParams.OwnerUri))
{
return false;
}
// Cancel any current connection attempts for this URI
CancellationTokenSource source;
if (ownerToCancellationTokenSourceMap.TryGetValue(cancelParams.OwnerUri, out source))
{
try
{
source.Cancel();
return true;
}
catch
{
return false;
}
}
else
{
return false;
}
}
/// <summary>
/// Close a connection with the specified connection details.
/// </summary>
@@ -253,6 +342,12 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
return false;
}
// Cancel if we are in the middle of connecting
if (CancelConnect(new CancelConnectParams() { OwnerUri = disconnectParams.OwnerUri }))
{
return false;
}
// Lookup the connection owned by the URI
ConnectionInfo info;
if (!ownerToConnectionMap.TryGetValue(disconnectParams.OwnerUri, out info))
@@ -327,6 +422,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
// Register request and event handlers with the Service Host
serviceHost.SetRequestHandler(ConnectionRequest.Type, HandleConnectRequest);
serviceHost.SetRequestHandler(CancelConnectRequest.Type, HandleCancelConnectRequest);
serviceHost.SetRequestHandler(DisconnectRequest.Type, HandleDisconnectRequest);
serviceHost.SetRequestHandler(ListDatabasesRequest.Type, HandleListDatabasesRequest);
@@ -359,14 +455,50 @@ namespace Microsoft.SqlTools.ServiceLayer.Connection
/// <returns></returns>
protected async Task HandleConnectRequest(
ConnectParams connectParams,
RequestContext<ConnectResponse> requestContext)
RequestContext<bool> requestContext)
{
Logger.Write(LogLevel.Verbose, "HandleConnectRequest");
try
{
// open connection base on request details
ConnectResponse result = ConnectionService.Instance.Connect(connectParams);
// create a task to connect asyncronously so that other requests are not blocked in the meantime
Task.Run(async () =>
{
try
{
// open connection based on request details
ConnectionCompleteParams result = await ConnectionService.Instance.Connect(connectParams);
await ServiceHost.SendEvent(ConnectionCompleteNotification.Type, result);
}
catch (Exception ex)
{
ConnectionCompleteParams result = new ConnectionCompleteParams()
{
Messages = ex.ToString()
};
await ServiceHost.SendEvent(ConnectionCompleteNotification.Type, result);
}
});
await requestContext.SendResult(true);
}
catch
{
await requestContext.SendResult(false);
}
}
/// <summary>
/// Handle cancel connect requests
/// </summary>
protected async Task HandleCancelConnectRequest(
CancelConnectParams cancelParams,
RequestContext<bool> requestContext)
{
Logger.Write(LogLevel.Verbose, "HandleCancelConnectRequest");
try
{
bool result = ConnectionService.Instance.CancelConnect(cancelParams);
await requestContext.SendResult(result);
}
catch(Exception ex)