Feat/result streaming - Fix for issue #746 in toolsservice and issue Microsoft/azuredatastudio#3348 (#753)

* fix for issues 746 & azuredatastudio issue 3348

* test coverage improvement for results streaming

* addressed minor review comments

* adding generated file test/CodeCoverage/package-lock.json to workaround code coverage issue.
This commit is contained in:
Arvind Ranasaria
2018-12-04 20:49:05 -08:00
committed by GitHub
parent a1946edca3
commit e4808c12aa
6 changed files with 3996 additions and 88 deletions

View File

@@ -360,6 +360,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
//
Validate.IsNotNull(nameof(dbDataReader), dbDataReader);
Task availableTask = null;
try
{
// Verify the request hasn't been cancelled
@@ -386,34 +387,37 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
// Mark that read of result has started
//
hasStartedRead = true;
// Invoke the SendCurrentResults() asynchronously that will send the results available notification
// and also trigger the timer to send periodic updates.
//
availableTask = SendCurrentResults();
while (await dataReader.ReadAsync(cancellationToken))
{
fileOffsets.Add(totalBytesWritten);
totalBytesWritten += fileWriter.WriteRow(dataReader);
// If we have never triggered the timer to start sending the results available/updated notification
// then: Trigger the timer to start sending results update notification
//
if (LastUpdatedSummary == null)
{
// Invoke the timer to send available/update result set notification immediately
//
resultsTimer.Change(0, Timeout.Infinite);
}
}
CheckForIsJson();
}
}
finally
{
hasCompletedRead = true; // set the flag to indicate that we are done reading
// Make a final call to ResultUpdated by invoking the timer to send update result set notification immediately
// set the flag to indicate that we are done reading
//
resultsTimer.Change(0, Timeout.Infinite);
hasCompletedRead = true;
// await the completion of available notification in case it is not already done before proceeding
//
await availableTask;
// Make a final call to SendCurrentResults() and await its completion. If the previously scheduled task already took care of latest status send then this should be a no-op
//
await SendCurrentResults();
// and finally:
// Make a call to send ResultCompletion and await for it to Complete
// Make a call to send ResultCompletion and await its completion. This is just for backward compatibility with older protocol
//
await (ResultCompletion?.Invoke(this) ?? Task.CompletedTask);
}
@@ -613,30 +617,52 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// Sends the ResultsUpdated message if the number of rows has changed since last send.
/// </summary>
/// <param name="stateInfo"></param>
private async void SendResultAvailableOrUpdated (object stateInfo = null)
private void SendResultAvailableOrUpdated (object stateInfo = null)
{
// Make the call to send current results and synchronously wait for it to finish
//
SendCurrentResults().Wait();
}
private async Task SendCurrentResults()
{
// await for previous task to complete before proceeding
//
await ResultsTask;
ResultSet currentResultSetSnapshot = (ResultSet) MemberwiseClone();
if (LastUpdatedSummary == null) // We need to send results available message.
if (LastUpdatedSummary == null) // We need to send results available message.
{
// Fire off results Available task and await for it complete
// Fire off results Available task and await it
//
await (ResultAvailable?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask);
ResultAvailable = null; // set this to null as we need to call ResultAvailable only once
ResultsTask = (ResultAvailable?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask);
await ResultsTask;
}
else // We need to send results updated message.
else if (LastUpdatedSummary.Complete) // If last result summary sent had already set the Complete flag
{
// If there has been no change in rowCount since last update and we are not done yet then log and increase the timer duration
// We do not need to do anything except that make sure that RowCount has not update since last send.
Debug.Assert(LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount, $"Already reported rows should be equal to current RowCount, if had already sent completion flag as set in last message, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}");
}
else // We need to send results updated message.
{
// Previously reported rows should be less than or equal to current number of rows about to be reported
//
if (!currentResultSetSnapshot.hasCompletedRead && LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount)
Debug.Assert(LastUpdatedSummary.RowCount <= currentResultSetSnapshot.RowCount, $"Already reported rows should less than or equal to current total RowCount, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}");
// If there has been no change in rowCount since last update and we have not yet completed read then log and increase the timer duration
//
if (!currentResultSetSnapshot.hasCompletedRead &&
LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount)
{
Logger.Write(TraceEventType.Warning, $"The result set:{Summary} has not made any progress in last {ResultTimerInterval} milliseconds and the read of resultset is not completed yet!");
Logger.Write(TraceEventType.Warning,
$"The result set:{Summary} has not made any progress in last {ResultTimerInterval} milliseconds and the read of this result set is not yet complete!");
ResultsIntervalMultiplier++;
}
// Fire off results updated task and await for it complete
// Fire off results updated task and await it
//
await (ResultUpdated?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask);
ResultsTask = (ResultUpdated?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask);
await ResultsTask;
}
// Update the LastUpdatedSummary to be the value captured in current snapshot
@@ -644,14 +670,17 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
LastUpdatedSummary = currentResultSetSnapshot.Summary;
// Setup timer for the next callback
//
if (currentResultSetSnapshot.hasCompletedRead)
{
//If we have already completed reading then we are done and we do not need to send any more updates. Switch off timer.
// If we have already completed reading then we are done and we do not need to send any more updates. Switch off timer.
//
resultsTimer.Change(Timeout.Infinite, Timeout.Infinite);
}
else
{
// If we have not yet completed reading then set the timer so this method gets called again after ResultTimerInterval milliseconds
//
resultsTimer.Change(ResultTimerInterval, Timeout.Infinite);
}
}
@@ -662,6 +691,8 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
internal ResultSetSummary LastUpdatedSummary { get; set; } = null;
internal Task ResultsTask { get; set; } = Task.CompletedTask;
/// <summary>
/// If the result set represented by this class corresponds to a single XML
/// column that contains results of "for xml" query, set isXml = true