Fix serialization tests & don't block thread (#846)

* Fix serialization tests & don't block thread
- Fixed potential null ref when closing streams
- Always clean up serialization queue if an error occurs
- Stop blocking dispatcher thread by not awaiting task that processes the message
- Improved error logging in EventFlowValidator to help debug issues
- Close stream on exception
This commit is contained in:
Kevin Cunnane
2019-08-09 09:48:57 -07:00
committed by GitHub
parent 4fe02a6885
commit d42e3626cb
4 changed files with 84 additions and 55 deletions

View File

@@ -14,6 +14,7 @@ using Microsoft.SqlTools.Hosting;
using Microsoft.SqlTools.Hosting.Protocol;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage;
using Microsoft.SqlTools.ServiceLayer.Utility;
using Microsoft.SqlTools.Utility;
@@ -40,93 +41,111 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// <summary>
/// Begin to process request to save a resultSet to a file in CSV format
/// </summary>
internal async Task HandleSerializeStartRequest(SerializeDataStartRequestParams serializeParams,
internal Task HandleSerializeStartRequest(SerializeDataStartRequestParams serializeParams,
RequestContext<SerializeDataResult> requestContext)
{
// Run in separate thread so that message thread isn't held up by a potentially time consuming file write
Task.Run(async () => {
await RunSerializeStartRequest(serializeParams, requestContext);
}).ContinueWithOnFaulted(async t => await SendErrorAndCleanup(serializeParams?.FilePath, requestContext, t.Exception));
return Task.CompletedTask;
}
internal async Task RunSerializeStartRequest(SerializeDataStartRequestParams serializeParams, RequestContext<SerializeDataResult> requestContext)
{
try
{
// Verify we have sensible inputs and there isn't a task running for this file already
Validate.IsNotNull(nameof(serializeParams), serializeParams);
Validate.IsNotNullOrWhitespaceString("FilePath", serializeParams.FilePath);
DataSerializer serializer = null;
bool hasSerializer = inProgressSerializations.TryGetValue(serializeParams.FilePath, out serializer);
if (hasSerializer)
if (inProgressSerializations.TryGetValue(serializeParams.FilePath, out serializer))
{
// Cannot proceed as there is an in progress serialization happening
throw new Exception(SR.SerializationServiceRequestInProgress(serializeParams.FilePath));
}
// Create a new serializer, save for future calls if needed, and write the request out
serializer = new DataSerializer(serializeParams);
if (!serializeParams.IsLastBatch)
{
inProgressSerializations.AddOrUpdate(serializer.FilePath, serializer, (key, old) => serializer);
}
Func<Task<SerializeDataResult>> writeData = () =>
{
return Task.Factory.StartNew(() =>
{
var result = serializer.ProcessRequest(serializeParams);
return result;
});
};
await HandleRequest(writeData, requestContext, "HandleSerializeStartRequest");
Logger.Write(TraceEventType.Verbose, "HandleSerializeStartRequest");
SerializeDataResult result = serializer.ProcessRequest(serializeParams);
await requestContext.SendResult(result);
}
catch (Exception ex)
{
await requestContext.SendError(ex.Message);
await SendErrorAndCleanup(serializeParams.FilePath, requestContext, ex);
}
}
private async Task SendErrorAndCleanup(string filePath, RequestContext<SerializeDataResult> requestContext, Exception ex)
{
if (filePath != null)
{
try
{
DataSerializer removed;
inProgressSerializations.TryRemove(filePath, out removed);
if (removed != null)
{
// Flush any contents to disk and remove the writer
removed.CloseStreams();
}
}
catch
{
// Do not care if there was an error removing this, must always delete if something failed
}
}
await requestContext.SendError(ex.Message);
}
/// <summary>
/// Process request to save a resultSet to a file in CSV format
/// </summary>
internal async Task HandleSerializeContinueRequest(SerializeDataContinueRequestParams serializeParams,
internal Task HandleSerializeContinueRequest(SerializeDataContinueRequestParams serializeParams,
RequestContext<SerializeDataResult> requestContext)
{
// Run in separate thread so that message thread isn't held up by a potentially time consuming file write
Task.Run(async () =>
{
await RunSerializeContinueRequest(serializeParams, requestContext);
}).ContinueWithOnFaulted(async t => await SendErrorAndCleanup(serializeParams?.FilePath, requestContext, t.Exception));
return Task.CompletedTask;
}
internal async Task RunSerializeContinueRequest(SerializeDataContinueRequestParams serializeParams, RequestContext<SerializeDataResult> requestContext)
{
try
{
// Verify we have sensible inputs and some data has already been sent for the file
Validate.IsNotNull(nameof(serializeParams), serializeParams);
Validate.IsNotNullOrWhitespaceString("FilePath", serializeParams.FilePath);
DataSerializer serializer = null;
bool hasSerializer = inProgressSerializations.TryGetValue(serializeParams.FilePath, out serializer);
if (!hasSerializer)
if (!inProgressSerializations.TryGetValue(serializeParams.FilePath, out serializer))
{
throw new Exception(SR.SerializationServiceRequestNotFound(serializeParams.FilePath));
}
Func<Task<SerializeDataResult>> writeData = () =>
// Write to file and cleanup if needed
Logger.Write(TraceEventType.Verbose, "HandleSerializeContinueRequest");
SerializeDataResult result = serializer.ProcessRequest(serializeParams);
if (serializeParams.IsLastBatch)
{
return Task.Factory.StartNew(() =>
{
var result = serializer.ProcessRequest(serializeParams);
if (serializeParams.IsLastBatch)
{
// Cleanup the serializer
this.inProgressSerializations.TryRemove(serializer.FilePath, out serializer);
}
return result;
});
};
await HandleRequest(writeData, requestContext, "HandleSerializeContinueRequest");
}
catch (Exception ex)
{
await requestContext.SendError(ex.Message);
}
}
private async Task HandleRequest<T>(Func<Task<T>> handler, RequestContext<T> requestContext, string requestType)
{
Logger.Write(TraceEventType.Verbose, requestType);
try
{
T result = await handler();
// Cleanup the serializer
this.inProgressSerializations.TryRemove(serializer.FilePath, out serializer);
}
await requestContext.SendResult(result);
}
catch (Exception ex)
{
await requestContext.SendError(ex.Message);
await SendErrorAndCleanup(serializeParams.FilePath, requestContext, ex);
}
}
}
@@ -242,9 +261,13 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
this.writer = factory.GetWriter(requestParams.FilePath);
}
}
private void CloseStreams()
public void CloseStreams()
{
this.writer.Dispose();
if (this.writer != null)
{
this.writer.Dispose();
this.writer = null;
}
}
private SaveResultsAsJsonRequestParams CreateJsonRequestParams()