Make save result async (#107)

* Make save results asynchronous

* Prevent write share of file

* Lock objects in stages

* Create Save result objects

* refactor and write rows in batches

* CHange batchSize from test value

* Remove await in handler

* Removing the file reader as a member of the resultset

* Change Dispose to wait for save

* Change concurrentBag

* PascalCase variables

* Modify function signature and tests

* Safe file methods

* refactor ResultSets to Ilist and remove ToList

* Change dictionary key and prevent add to saveTasks during dispose

* Simplify row concatenation

* Fix prevent add

* Fix prevent add

* Add methods to expose saveTasks and isBeingDisposed
This commit is contained in:
Sharon Ravindran
2016-10-21 20:07:21 -07:00
committed by GitHub
parent b389d275a2
commit 2a688cb87f
7 changed files with 468 additions and 196 deletions

View File

@@ -4,6 +4,7 @@
//
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
@@ -44,12 +45,6 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// </summary>
private readonly IFileStreamFactory fileStreamFactory;
/// <summary>
/// File stream reader that will be reused to make rapid-fire retrieval of result subsets
/// quick and low perf impact.
/// </summary>
private IFileStreamReader fileStreamReader;
/// <summary>
/// Whether or not the result set has been read in from the database
/// </summary>
@@ -65,6 +60,16 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// </summary>
private readonly string outputFileName;
/// <summary>
/// Whether the resultSet is in the process of being disposed
/// </summary>
private bool isBeingDisposed;
/// <summary>
/// All save tasks currently saving this ResultSet
/// </summary>
private ConcurrentDictionary<string, Task> saveTasks;
#endregion
/// <summary>
@@ -86,10 +91,23 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
// Store the factory
fileStreamFactory = factory;
hasBeenRead = false;
saveTasks = new ConcurrentDictionary<string, Task>();
}
#region Properties
/// <summary>
/// Whether the resultSet is in the process of being disposed
/// </summary>
/// <returns></returns>
internal bool IsBeingDisposed
{
get
{
return isBeingDisposed;
}
}
/// <summary>
/// The columns for this result set
/// </summary>
@@ -120,18 +138,6 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// </summary>
public long RowCount { get; private set; }
/// <summary>
/// The rows of this result set
/// </summary>
public IEnumerable<string[]> Rows
{
get
{
return FileOffsets.Select(
offset => fileStreamReader.ReadRow(offset, Columns).Select(cell => cell.DisplayValue).ToArray());
}
}
#endregion
#region Public Methods
@@ -145,7 +151,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
public Task<ResultSetSubset> GetSubset(int startRow, int rowCount)
{
// Sanity check to make sure that the results have been read beforehand
if (!hasBeenRead || fileStreamReader == null)
if (!hasBeenRead)
{
throw new InvalidOperationException(SR.QueryServiceResultSetNotRead);
}
@@ -164,28 +170,30 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
{
string[][] rows;
// If result set is 'for xml' or 'for json',
// Concatenate all the rows together into one row
if (isSingleColumnXmlJsonResultSet)
using (IFileStreamReader fileStreamReader = fileStreamFactory.GetReader(outputFileName))
{
// Iterate over all the rows and process them into a list of string builders
IEnumerable<StringBuilder> sbRows = FileOffsets.Select(rowOffset => fileStreamReader.ReadRow(rowOffset, Columns)
.Select(cell => cell.DisplayValue).Aggregate(new StringBuilder(), (sb, value) => sb.Append(value)));
rows = new[] { new[] { string.Join(string.Empty, sbRows) } };
// If result set is 'for xml' or 'for json',
// Concatenate all the rows together into one row
if (isSingleColumnXmlJsonResultSet)
{
// Iterate over all the rows and process them into a list of string builders
IEnumerable<string> rowValues = FileOffsets.Select(rowOffset => fileStreamReader.ReadRow(rowOffset, Columns)[0].DisplayValue);
rows = new[] { new[] { string.Join(string.Empty, rowValues) } };
}
else
{
// Figure out which rows we need to read back
IEnumerable<long> rowOffsets = FileOffsets.Skip(startRow).Take(rowCount);
// Iterate over the rows we need and process them into output
rows = rowOffsets.Select(rowOffset =>
fileStreamReader.ReadRow(rowOffset, Columns).Select(cell => cell.DisplayValue).ToArray())
.ToArray();
}
}
else
{
// Figure out which rows we need to read back
IEnumerable<long> rowOffsets = FileOffsets.Skip(startRow).Take(rowCount);
// Iterate over the rows we need and process them into output
rows = rowOffsets.Select(rowOffset =>
fileStreamReader.ReadRow(rowOffset, Columns).Select(cell => cell.DisplayValue).ToArray())
.ToArray();
}
// Retrieve the subset of the results as per the request
return new ResultSetSubset
{
@@ -203,7 +211,6 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
{
// Mark that result has been read
hasBeenRead = true;
fileStreamReader = fileStreamFactory.GetReader(outputFileName);
// Open a writer for the file
using (IFileStreamWriter fileWriter = fileStreamFactory.GetWriter(outputFileName, MaxCharsToStore, MaxXmlCharsToStore))
@@ -244,13 +251,31 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
return;
}
if (disposing)
isBeingDisposed = true;
// Check if saveTasks are running for this ResultSet
if (!saveTasks.IsEmpty)
{
fileStreamReader?.Dispose();
fileStreamFactory.DisposeFile(outputFileName);
// Wait for tasks to finish before disposing ResultSet
Task.WhenAll(saveTasks.Values.ToArray()).ContinueWith((antecedent) =>
{
if (disposing)
{
fileStreamFactory.DisposeFile(outputFileName);
}
disposed = true;
isBeingDisposed = false;
});
}
else
{
// If saveTasks is empty, continue with dispose
if (disposing)
{
fileStreamFactory.DisposeFile(outputFileName);
}
disposed = true;
isBeingDisposed = false;
}
disposed = true;
}
#endregion
@@ -283,5 +308,25 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
}
#endregion
#region Internal Methods to Add and Remove save tasks
internal void AddSaveTask(string key, Task saveTask)
{
saveTasks.TryAdd(key, saveTask);
}
internal void RemoveSaveTask(string key)
{
Task completedTask;
saveTasks.TryRemove(key, out completedTask);
}
internal Task GetSaveTask(string key)
{
Task completedTask;
saveTasks.TryRemove(key, out completedTask);
return completedTask;
}
#endregion
}
}