This change ensures that we send back only one 'isComplete=true' message for a given result set (#763)

Fixes:
The streaming protocol is now tightened to ensure that only the last message for a result set contain isCompleted=true. Now isCompleted=true is never sent in isAvailable message.
Tightened logic in sending messages to make sure that no duplicate messages get sent out due to concurrent processing.
Made a fix to a null reference exception when processing special action which was a pre-existing benign bug.
Testing: Added 1 more new test that runs existing tests concurrently 1000 times to make sure no random timing issues are observed and tightened verifications to existing tests to ensure no duplicate messages and only one isComplete=true message is sent across.

* tightening the protocal to ensure only one message (the last one) with completed=true is sent back for a single result set within a query batch
This commit is contained in:
Arvind Ranasaria
2019-01-11 10:21:52 -08:00
committed by GitHub
parent 0fd98df79b
commit 5f6d500977
4 changed files with 128 additions and 56 deletions

View File

@@ -91,7 +91,9 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
/// </summary>
internal long totalBytesWritten;
internal readonly Timer resultsTimer;
private readonly Timer resultsTimer;
private readonly SemaphoreSlim sendResultsSemphore = new SemaphoreSlim(1);
#endregion
@@ -403,14 +405,16 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
}
finally
{
// set the flag to indicate that we are done reading
//
hasCompletedRead = true;
// await the completion of available notification in case it is not already done before proceeding
//
await availableTask;
// now set the flag to indicate that we are done reading. this equates to Complete flag to be marked 'True' in any future notifications.
//
hasCompletedRead = true;
// Make a final call to SendCurrentResults() and await its completion. If the previously scheduled task already took care of latest status send then this should be a no-op
//
await SendCurrentResults();
@@ -626,62 +630,72 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
private async Task SendCurrentResults()
{
// await for previous task to complete before proceeding
//
await ResultsTask;
try
{
ResultSet currentResultSetSnapshot = (ResultSet) MemberwiseClone();
if (LastUpdatedSummary == null) // We need to send results available message.
{
// Fire off results Available task and await it
// Wait to acquire the sendResultsSemphore before proceeding, as we want only one instance of this method executing at any given time.
//
ResultsTask = (ResultAvailable?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask);
await ResultsTask;
}
else if (LastUpdatedSummary.Complete) // If last result summary sent had already set the Complete flag
{
// We do not need to do anything except that make sure that RowCount has not update since last send.
Debug.Assert(LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount, $"Already reported rows should be equal to current RowCount, if had already sent completion flag as set in last message, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}");
}
else // We need to send results updated message.
{
// Previously reported rows should be less than or equal to current number of rows about to be reported
//
Debug.Assert(LastUpdatedSummary.RowCount <= currentResultSetSnapshot.RowCount, $"Already reported rows should less than or equal to current total RowCount, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}");
sendResultsSemphore.Wait();
// If there has been no change in rowCount since last update and we have not yet completed read then log and increase the timer duration
//
if (!currentResultSetSnapshot.hasCompletedRead &&
LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount)
ResultSet currentResultSetSnapshot = (ResultSet) MemberwiseClone();
if (LastUpdatedSummary == null) // We need to send results available message.
{
Logger.Write(TraceEventType.Warning,
$"The result set:{Summary} has not made any progress in last {ResultTimerInterval} milliseconds and the read of this result set is not yet complete!");
ResultsIntervalMultiplier++;
// Fire off results Available task and await it
//
await (ResultAvailable?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask);
}
else if (LastUpdatedSummary.Complete) // If last result summary sent had already set the Complete flag
{
// We do not need to do anything except that make sure that RowCount has not update since last send.
Debug.Assert(LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount,
$"Already reported rows should be equal to current RowCount, if had already sent completion flag as set in last message, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}");
}
else // We need to send results updated message.
{
// Previously reported rows should be less than or equal to current number of rows about to be reported
//
Debug.Assert(LastUpdatedSummary.RowCount <= currentResultSetSnapshot.RowCount,
$"Already reported rows should less than or equal to current total RowCount, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}");
// If there has been no change in rowCount since last update and we have not yet completed read then log and increase the timer duration
//
if (!currentResultSetSnapshot.hasCompletedRead &&
LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount)
{
Logger.Write(TraceEventType.Warning,
$"The result set:{Summary} has not made any progress in last {ResultTimerInterval} milliseconds and the read of this result set is not yet complete!");
ResultsIntervalMultiplier++;
}
// Fire off results updated task and await it
//
await (ResultUpdated?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask);
}
// Fire off results updated task and await it
// Update the LastUpdatedSummary to be the value captured in current snapshot
//
ResultsTask = (ResultUpdated?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask);
await ResultsTask;
}
LastUpdatedSummary = currentResultSetSnapshot.Summary;
// Update the LastUpdatedSummary to be the value captured in current snapshot
//
LastUpdatedSummary = currentResultSetSnapshot.Summary;
// Setup timer for the next callback
//
if (currentResultSetSnapshot.hasCompletedRead)
{
// If we have already completed reading then we are done and we do not need to send any more updates. Switch off timer.
// Setup timer for the next callback
//
resultsTimer.Change(Timeout.Infinite, Timeout.Infinite);
if (currentResultSetSnapshot.hasCompletedRead)
{
// If we have already completed reading then we are done and we do not need to send any more updates. Switch off timer.
//
resultsTimer.Change(Timeout.Infinite, Timeout.Infinite);
}
else
{
// If we have not yet completed reading then set the timer so this method gets called again after ResultTimerInterval milliseconds
//
resultsTimer.Change(ResultTimerInterval, Timeout.Infinite);
}
}
else
{
// If we have not yet completed reading then set the timer so this method gets called again after ResultTimerInterval milliseconds
finally
{
// Release the sendResultsSemphore so the next invocation gets unblocked
//
resultsTimer.Change(ResultTimerInterval, Timeout.Infinite);
sendResultsSemphore.Release();
}
}
@@ -691,8 +705,6 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
internal ResultSetSummary LastUpdatedSummary { get; set; } = null;
internal Task ResultsTask { get; set; } = Task.CompletedTask;
/// <summary>
/// If the result set represented by this class corresponds to a single XML
/// column that contains results of "for xml" query, set isXml = true