diff --git a/src/Microsoft.SqlTools.ServiceLayer/QueryExecution/ResultSet.cs b/src/Microsoft.SqlTools.ServiceLayer/QueryExecution/ResultSet.cs index 7c465860..f2a50e09 100644 --- a/src/Microsoft.SqlTools.ServiceLayer/QueryExecution/ResultSet.cs +++ b/src/Microsoft.SqlTools.ServiceLayer/QueryExecution/ResultSet.cs @@ -91,7 +91,9 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution /// internal long totalBytesWritten; - internal readonly Timer resultsTimer; + private readonly Timer resultsTimer; + + private readonly SemaphoreSlim sendResultsSemphore = new SemaphoreSlim(1); #endregion @@ -403,14 +405,16 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution } finally { - // set the flag to indicate that we are done reading - // - hasCompletedRead = true; // await the completion of available notification in case it is not already done before proceeding // await availableTask; + // now set the flag to indicate that we are done reading. this equates to Complete flag to be marked 'True' in any future notifications. + // + hasCompletedRead = true; + + // Make a final call to SendCurrentResults() and await its completion. If the previously scheduled task already took care of latest status send then this should be a no-op // await SendCurrentResults(); @@ -626,62 +630,72 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution private async Task SendCurrentResults() { - // await for previous task to complete before proceeding - // - await ResultsTask; + try + { - ResultSet currentResultSetSnapshot = (ResultSet) MemberwiseClone(); - if (LastUpdatedSummary == null) // We need to send results available message. - { - // Fire off results Available task and await it + // Wait to acquire the sendResultsSemphore before proceeding, as we want only one instance of this method executing at any given time. // - ResultsTask = (ResultAvailable?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask); - await ResultsTask; - } - else if (LastUpdatedSummary.Complete) // If last result summary sent had already set the Complete flag - { - // We do not need to do anything except that make sure that RowCount has not update since last send. - Debug.Assert(LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount, $"Already reported rows should be equal to current RowCount, if had already sent completion flag as set in last message, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}"); - } - else // We need to send results updated message. - { - // Previously reported rows should be less than or equal to current number of rows about to be reported - // - Debug.Assert(LastUpdatedSummary.RowCount <= currentResultSetSnapshot.RowCount, $"Already reported rows should less than or equal to current total RowCount, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}"); + sendResultsSemphore.Wait(); - // If there has been no change in rowCount since last update and we have not yet completed read then log and increase the timer duration - // - if (!currentResultSetSnapshot.hasCompletedRead && - LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount) + ResultSet currentResultSetSnapshot = (ResultSet) MemberwiseClone(); + if (LastUpdatedSummary == null) // We need to send results available message. { - Logger.Write(TraceEventType.Warning, - $"The result set:{Summary} has not made any progress in last {ResultTimerInterval} milliseconds and the read of this result set is not yet complete!"); - ResultsIntervalMultiplier++; + // Fire off results Available task and await it + // + await (ResultAvailable?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask); + } + else if (LastUpdatedSummary.Complete) // If last result summary sent had already set the Complete flag + { + // We do not need to do anything except that make sure that RowCount has not update since last send. + Debug.Assert(LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount, + $"Already reported rows should be equal to current RowCount, if had already sent completion flag as set in last message, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}"); + } + else // We need to send results updated message. + { + // Previously reported rows should be less than or equal to current number of rows about to be reported + // + Debug.Assert(LastUpdatedSummary.RowCount <= currentResultSetSnapshot.RowCount, + $"Already reported rows should less than or equal to current total RowCount, countReported:{LastUpdatedSummary.RowCount}, current total row count: {currentResultSetSnapshot.RowCount}, row count override: {currentResultSetSnapshot.rowCountOverride}, this.rowCountOverride: {this.rowCountOverride} and this.RowCount: {this.RowCount}, LastUpdatedSummary: {LastUpdatedSummary}"); + + // If there has been no change in rowCount since last update and we have not yet completed read then log and increase the timer duration + // + if (!currentResultSetSnapshot.hasCompletedRead && + LastUpdatedSummary.RowCount == currentResultSetSnapshot.RowCount) + { + Logger.Write(TraceEventType.Warning, + $"The result set:{Summary} has not made any progress in last {ResultTimerInterval} milliseconds and the read of this result set is not yet complete!"); + ResultsIntervalMultiplier++; + } + + // Fire off results updated task and await it + // + await (ResultUpdated?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask); } - // Fire off results updated task and await it + // Update the LastUpdatedSummary to be the value captured in current snapshot // - ResultsTask = (ResultUpdated?.Invoke(currentResultSetSnapshot) ?? Task.CompletedTask); - await ResultsTask; - } + LastUpdatedSummary = currentResultSetSnapshot.Summary; - // Update the LastUpdatedSummary to be the value captured in current snapshot - // - LastUpdatedSummary = currentResultSetSnapshot.Summary; - - // Setup timer for the next callback - // - if (currentResultSetSnapshot.hasCompletedRead) - { - // If we have already completed reading then we are done and we do not need to send any more updates. Switch off timer. + // Setup timer for the next callback // - resultsTimer.Change(Timeout.Infinite, Timeout.Infinite); + if (currentResultSetSnapshot.hasCompletedRead) + { + // If we have already completed reading then we are done and we do not need to send any more updates. Switch off timer. + // + resultsTimer.Change(Timeout.Infinite, Timeout.Infinite); + } + else + { + // If we have not yet completed reading then set the timer so this method gets called again after ResultTimerInterval milliseconds + // + resultsTimer.Change(ResultTimerInterval, Timeout.Infinite); + } } - else - { - // If we have not yet completed reading then set the timer so this method gets called again after ResultTimerInterval milliseconds + finally + { + // Release the sendResultsSemphore so the next invocation gets unblocked // - resultsTimer.Change(ResultTimerInterval, Timeout.Infinite); + sendResultsSemphore.Release(); } } @@ -691,8 +705,6 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution internal ResultSetSummary LastUpdatedSummary { get; set; } = null; - internal Task ResultsTask { get; set; } = Task.CompletedTask; - /// /// If the result set represented by this class corresponds to a single XML /// column that contains results of "for xml" query, set isXml = true diff --git a/src/Microsoft.SqlTools.ServiceLayer/QueryExecution/SpecialAction.cs b/src/Microsoft.SqlTools.ServiceLayer/QueryExecution/SpecialAction.cs index f52fe96f..4598b018 100644 --- a/src/Microsoft.SqlTools.ServiceLayer/QueryExecution/SpecialAction.cs +++ b/src/Microsoft.SqlTools.ServiceLayer/QueryExecution/SpecialAction.cs @@ -72,7 +72,7 @@ namespace Microsoft.SqlTools.ServiceLayer.QueryExecution /// public void CombineSpecialAction(SpecialAction action) { - flags |= action.flags; + flags |= ((action?.flags) ?? ActionFlags.None); } public override string ToString() => $"ActionFlag:'{flags}', ExpectYukonXMLShowPlan:'{ExpectYukonXMLShowPlan}'"; #endregion diff --git a/test/Microsoft.SqlTools.ServiceLayer.UnitTests/QueryExecution/Execution/ResultSetTests.cs b/test/Microsoft.SqlTools.ServiceLayer.UnitTests/QueryExecution/Execution/ResultSetTests.cs index aaabac0d..a4d81c54 100644 --- a/test/Microsoft.SqlTools.ServiceLayer.UnitTests/QueryExecution/Execution/ResultSetTests.cs +++ b/test/Microsoft.SqlTools.ServiceLayer.UnitTests/QueryExecution/Execution/ResultSetTests.cs @@ -125,6 +125,30 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.QueryExecution.Execution // VerifyReadResultToEnd(resultSet, resultSummaryFromAvailableCallback, resultSummaryFromCompleteCallback, resultSummariesFromUpdatedCallback); } + + /// + /// Read to End test + /// + /// + [Theory] + [MemberData(nameof(ReadToEndSuccessData), parameters: 3)] + public async Task ReadToEndSuccessSeveralTimes(TestResultSet[] testDataSet) + { + const int NumberOfInvocations = 5000; + List allTasks = new List(); + Parallel.ForEach(Partitioner.Create(0, NumberOfInvocations), (range) => + { + int start = range.Item1 == 0 ? 1 : range.Item1; + Task[] tasks = new Task[range.Item2 - start]; + for (int i = start; i < range.Item2; i++) + { + allTasks.Add(ReadToEndSuccess(testDataSet)); + } + + }); + await Task.WhenAll(allTasks); + } + public static IEnumerable ReadToEndSuccessData(int numTests) => Common.TestResultSetsEnumeration.Select(r => new object[] { new TestResultSet[] { r } }).Take(numTests); [Theory] @@ -160,7 +184,11 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.QueryExecution.Execution // ... The callback for result set available, update and completion callbacks should have been fired. // Assert.True(null != resultSummaryFromCompleteCallback, "completeResultSummary is null" + $"\r\n\t\tupdateResultSets: {string.Join("\r\n\t\t\t", resultSummariesFromUpdatedCallback)}"); - Assert.True(null != resultSummaryFromAvailableCallback, "availableResultSummary is null" + $"\r\n\t\tupdateResultSets: {string.Join("\r\n\t\t\t", resultSummariesFromUpdatedCallback)}"); + Assert.True(null != resultSummaryFromAvailableCallback, "availableResultSummary is null" + $"\r\n\t\tavailableResultSet: {resultSummaryFromAvailableCallback}"); + + // ... resultSetAvailable is not marked Complete + // + Assert.True(false == resultSummaryFromAvailableCallback.Complete, "availableResultSummary.Complete is true" + $"\r\n\t\tavailableResultSet: {resultSummaryFromAvailableCallback}"); // insert availableResult at the top of the resultSummariesFromUpdatedCallback list as the available result set is the first update in that series. // @@ -175,6 +203,7 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.QueryExecution.Execution Assert.True(resultSummariesFromUpdatedCallback.Last().Complete, $"Complete Check failed.\r\n\t\t resultSummariesFromUpdatedCallback:{string.Join("\r\n\t\t\t", resultSummariesFromUpdatedCallback)}"); + // ... The no of rows in the final updateResultSet/AvailableResultSet should be equal to that in the Complete Result Set. // Assert.True(resultSummaryFromCompleteCallback.RowCount == resultSummariesFromUpdatedCallback.Last().RowCount, @@ -184,7 +213,9 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.QueryExecution.Execution ); // ... RowCount should be in increasing order in updateResultSet callbacks + // ..... and there should be only one resultSummary with Complete flag set to true. // + int completeFlagCount = 0; Parallel.ForEach(Partitioner.Create(0, resultSummariesFromUpdatedCallback.Count), (range) => { int start = range.Item1 == 0 ? 1 : range.Item1; @@ -194,8 +225,13 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.QueryExecution.Execution $"Row Count of {i}th updateResultSet was smaller than that of the previous one" + $"\r\n\t\tupdateResultSets: {string.Join("\r\n\t\t\t", resultSummariesFromUpdatedCallback)}" ); + if (resultSummariesFromUpdatedCallback[i].Complete) + { + Interlocked.Increment(ref completeFlagCount); + } } }); + Assert.True(completeFlagCount == 1, "Number of update events with complete flag event set should be 1" + $"\r\n\t\tupdateResultSets: {string.Join("\r\n\t\t\t", resultSummariesFromUpdatedCallback)}"); } /// diff --git a/test/Microsoft.SqlTools.ServiceLayer.UnitTests/QueryExecution/Execution/ServiceIntegrationTests.cs b/test/Microsoft.SqlTools.ServiceLayer.UnitTests/QueryExecution/Execution/ServiceIntegrationTests.cs index 04f38e1b..9c8758f9 100644 --- a/test/Microsoft.SqlTools.ServiceLayer.UnitTests/QueryExecution/Execution/ServiceIntegrationTests.cs +++ b/test/Microsoft.SqlTools.ServiceLayer.UnitTests/QueryExecution/Execution/ServiceIntegrationTests.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.SqlTools.Hosting.Protocol.Contracts; using Microsoft.SqlTools.ServiceLayer.QueryExecution; @@ -652,17 +653,26 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.QueryExecution.Execution foreach (var (key, list) in resultSetDictionary) { ResultSetSummary completeSummary = null, lastResultSetSummary = null; + int completeFlagCount = 0; for (int i = 0; i < list.Count; i++) { - VerifyResultSummary(key, i, list, ref completeSummary, ref lastResultSetSummary); + VerifyResultSummary(key, i, list, ref completeSummary, ref lastResultSetSummary, ref completeFlagCount); } // Verify that the completeEvent and lastResultSetSummary has same number of rows // if (lastResultSetSummary != null && completeSummary != null) { - Assert.True(lastResultSetSummary.RowCount == completeSummary.RowCount, "CompleteSummary and last Update Summary should have same number of rows"); + Assert.True(lastResultSetSummary.RowCount == completeSummary.RowCount, $"CompleteSummary and last Update Summary should have same number of rows, updateSummaryRowCount: {lastResultSetSummary.RowCount}, CompleteSummaryRowCount: {completeSummary.RowCount}"); } + + // Verify that we got exactly on complete Flag Count. + // + Assert.True(1 == completeFlagCount, $"Complete flag should be set in exactly once on Update Result Summary Event. Observed Count: {completeFlagCount}, \r\nresultSetEventParamsList is:{string.Join("\r\n\t\t", list.ConvertAll((p) => p.GetType() + ":" + p.ResultSetSummary))}" ); + + // Verify that the complete event was set on the last updateResultSet event. + // + Assert.True(list.Last().ResultSetSummary.Complete, $"Complete flag should be set on the last Update Result Summary event, , \r\nresultSetEventParamsList is:{string.Join("\r\n\t\t", list.ConvertAll((p) => p.GetType() + ":" + p.ResultSetSummary))}"); } @@ -677,7 +687,8 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.QueryExecution.Execution /// The list of resultSetParams that we are verifying /// This should be null when we start validating the list of ResultSetEventParams /// This should be null when we start validating the list of ResultSetEventParams - private static void VerifyResultSummary(string batchIdResultSetId, int position, List resultSetEventParamsList, ref ResultSetSummary completeSummary, ref ResultSetSummary lastResultSetSummary) + /// The number of events with complete flag set. Increments input value if current resultSummary has Complete flag set + private static void VerifyResultSummary(string batchIdResultSetId, int position, List resultSetEventParamsList, ref ResultSetSummary completeSummary, ref ResultSetSummary lastResultSetSummary, ref int completeFlagCount) { ResultSetEventParams resultSetEventParams = resultSetEventParamsList[position]; switch (resultSetEventParams.GetType().Name) @@ -690,9 +701,16 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.QueryExecution.Execution + $"\r\nresultSetEventParamsList is:{string.Join("\r\n\t\t", resultSetEventParamsList.ConvertAll((p) => p.GetType() + ":" + p.ResultSetSummary))}" ); + // Verify that Complete flag is not set on the ResultSetAvailableEventParams + Assert.True(false == resultSetEventParams.ResultSetSummary.Complete, + $"availableResultSummary.Complete is true for {batchIdResultSetId}" + + $"\r\nresultSetEventParamsList is:{string.Join("\r\n\t\t", resultSetEventParamsList.ConvertAll((p) => p.GetType() + ":" + p.ResultSetSummary))}" + ); + // Save the lastResultSetSummary for this event for other verifications. // lastResultSetSummary = resultSetEventParams.ResultSetSummary; + break; case nameof(ResultSetUpdatedEventParams): // Verify that the updateEvent is not the first in the sequence. Since we set lastResultSetSummary on each available or updatedEvent, we check that there has been no lastResultSetSummary previously set yet. @@ -713,6 +731,12 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.QueryExecution.Execution // Save the lastResultSetSummary for this event for other verifications. // lastResultSetSummary = resultSetEventParams.ResultSetSummary; + + // increment completeFlagCount if complete flag is set. + if (resultSetEventParams.ResultSetSummary.Complete) + { + Interlocked.Increment(ref completeFlagCount); + } break; case nameof(ResultSetCompleteEventParams): // Verify that there is only one completeEvent