//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage;
using Microsoft.SqlTools.ServiceLayer.Utility;
using Microsoft.SqlTools.Utility;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution
{
///
/// Class that represents a resultset the was generated from a query. Contains logic for
/// storing and retrieving results. Is contained by a Batch class.
///
public class ResultSet : IDisposable
{
#region Constants
// Column names of 'for xml' and 'for json' queries
private const string NameOfForXmlColumn = "XML_F52E2B61-18A1-11d1-B105-00805F49916B";
private const string NameOfForJsonColumn = "JSON_F52E2B61-18A1-11d1-B105-00805F49916B";
private const string YukonXmlShowPlanColumn = "Microsoft SQL Server 2005 XML Showplan";
#endregion
#region Member Variables
///
/// For IDisposable pattern, whether or not object has been disposed
///
private bool disposed;
///
/// A list of offsets into the buffer file that correspond to where rows start
///
private readonly LongList fileOffsets;
///
/// The factory to use to get reading/writing handlers
///
private readonly IFileStreamFactory fileStreamFactory;
///
/// Whether or not the result set has been read in from the database,
/// set as internal in order to fake value in unit tests
///
internal bool hasBeenRead;
///
/// Whether resultSet is a 'for xml' or 'for json' result
///
private bool isSingleColumnXmlJsonResultSet;
///
/// The name of the temporary file we're using to output these results in
///
private readonly string outputFileName;
///
/// Row count to use in special scenarios where we want to override the number of rows.
///
private long? rowCountOverride;
///
/// The special action which applied to this result set
///
private readonly SpecialAction specialAction;
///
/// Total number of bytes written to the file. Used to jump to end of the file for append
/// scenarios. Internal for unit test validation.
///
internal long totalBytesWritten;
#endregion
///
/// Creates a new result set and initializes its state
///
/// The ID of the resultset, the ordinal of the result within the batch
/// The ID of the batch, the ordinal of the batch within the query
/// Factory for creating a reader/writer
public ResultSet(int ordinal, int batchOrdinal, IFileStreamFactory factory)
{
Id = ordinal;
BatchId = batchOrdinal;
// Initialize the storage
totalBytesWritten = 0;
outputFileName = factory.CreateFile();
fileOffsets = new LongList();
specialAction = new SpecialAction();
// Store the factory
fileStreamFactory = factory;
hasBeenRead = false;
SaveTasks = new ConcurrentDictionary();
}
#region Eventing
///
/// Asynchronous handler for when saving query results succeeds
///
/// Request parameters for identifying the request
public delegate Task SaveAsAsyncEventHandler(SaveResultsRequestParams parameters);
///
/// Asynchronous handler for when saving query results fails
///
/// Request parameters for identifying the request
/// Message to send back describing why the request failed
public delegate Task SaveAsFailureAsyncEventHandler(SaveResultsRequestParams parameters, string message);
///
/// Asynchronous handler for when a resultset has completed
///
/// The result set that completed
public delegate Task ResultSetAsyncEventHandler(ResultSet resultSet);
///
/// Event that will be called when the result set has completed execution
///
public event ResultSetAsyncEventHandler ResultCompletion;
#endregion
#region Properties
///
/// The columns for this result set
///
public DbColumnWrapper[] Columns { get; private set; }
///
/// ID of the result set, relative to the batch
///
public int Id { get; private set; }
///
/// ID of the batch set, relative to the query
///
public int BatchId { get; private set; }
///
/// The number of rows for this result set
///
public long RowCount => rowCountOverride ?? fileOffsets.Count;
///
/// All save tasks currently saving this ResultSet
///
internal ConcurrentDictionary SaveTasks { get; set; }
///
/// Generates a summary of this result set
///
public ResultSetSummary Summary
{
get
{
return new ResultSetSummary
{
ColumnInfo = Columns,
Id = Id,
BatchId = BatchId,
RowCount = RowCount,
SpecialAction = hasBeenRead ? ProcessSpecialAction() : null
};
}
}
#endregion
#region Public Methods
///
/// Returns a specific row from the result set.
///
///
/// Creates a new file reader for a single reader. This method should only be used for one
/// off requests, not for requesting a large subset of the results.
///
/// The internal ID of the row to read
/// The requested row
public IList GetRow(long rowId)
{
// Sanity check to make sure that results have been read beforehand
if (!hasBeenRead)
{
throw new InvalidOperationException(SR.QueryServiceResultSetNotRead);
}
// Sanity check to make sure that the row exists
if (rowId >= RowCount)
{
throw new ArgumentOutOfRangeException(nameof(rowId), SR.QueryServiceResultSetStartRowOutOfRange);
}
using (IFileStreamReader fileStreamReader = fileStreamFactory.GetReader(outputFileName))
{
return fileStreamReader.ReadRow(fileOffsets[rowId], rowId, Columns);
}
}
///
/// Generates a subset of the rows from the result set
///
/// The starting row of the results
/// How many rows to retrieve
/// A subset of results
public Task GetSubset(long startRow, int rowCount)
{
// Sanity check to make sure that the results have been read beforehand
if (!hasBeenRead)
{
throw new InvalidOperationException(SR.QueryServiceResultSetNotRead);
}
// Sanity check to make sure that the row and the row count are within bounds
if (startRow < 0 || startRow >= RowCount)
{
throw new ArgumentOutOfRangeException(nameof(startRow), SR.QueryServiceResultSetStartRowOutOfRange);
}
if (rowCount <= 0)
{
throw new ArgumentOutOfRangeException(nameof(rowCount), SR.QueryServiceResultSetRowCountOutOfRange);
}
return Task.Factory.StartNew(() =>
{
DbCellValue[][] rows;
using (IFileStreamReader fileStreamReader = fileStreamFactory.GetReader(outputFileName))
{
// If result set is 'for xml' or 'for json',
// Concatenate all the rows together into one row
if (isSingleColumnXmlJsonResultSet)
{
// Iterate over all the rows and process them into a list of string builders
// ReSharper disable once AccessToDisposedClosure The lambda is used immediately in string.Join call
IEnumerable rowValues = fileOffsets.Select(rowOffset => fileStreamReader.ReadRow(rowOffset, 0, Columns)[0].DisplayValue);
string singleString = string.Join(string.Empty, rowValues);
DbCellValue cellValue = new DbCellValue
{
DisplayValue = singleString,
IsNull = false,
RawObject = singleString,
RowId = 0
};
rows = new[] { new[] { cellValue } };
}
else
{
// Figure out which rows we need to read back
IEnumerable rowOffsets = fileOffsets.LongSkip(startRow).Take(rowCount);
// Iterate over the rows we need and process them into output
// ReSharper disable once AccessToDisposedClosure The lambda is used immediately in .ToArray call
rows = rowOffsets.Select((offset, id) => fileStreamReader.ReadRow(offset, id, Columns).ToArray()).ToArray();
}
}
// Retrieve the subset of the results as per the request
return new ResultSetSubset
{
Rows = rows,
RowCount = rows.Length
};
});
}
///
/// Generates the execution plan from the table returned
///
/// An execution plan object
public Task GetExecutionPlan()
{
// Process the action just incase is hasn't been yet
ProcessSpecialAction();
// Sanity check to make sure that the results have been read beforehand
if (!hasBeenRead)
{
throw new InvalidOperationException(SR.QueryServiceResultSetNotRead);
}
// Check that we this result set contains a showplan
if (!specialAction.ExpectYukonXMLShowPlan)
{
throw new Exception(SR.QueryServiceExecutionPlanNotFound);
}
return Task.Factory.StartNew(() =>
{
string content;
string format = null;
using (IFileStreamReader fileStreamReader = fileStreamFactory.GetReader(outputFileName))
{
// Determine the format and get the first col/row of XML
content = fileStreamReader.ReadRow(0, 0, Columns)[0].DisplayValue;
if (specialAction.ExpectYukonXMLShowPlan)
{
format = "xml";
}
}
return new ExecutionPlan
{
Format = format,
Content = content
};
});
}
///
/// Reads from the reader until there are no more results to read
///
/// The data reader for getting results from the db
/// Cancellation token for cancelling the query
public async Task ReadResultToEnd(DbDataReader dbDataReader, CancellationToken cancellationToken)
{
// Sanity check to make sure we got a reader
Validate.IsNotNull(nameof(dbDataReader), dbDataReader);
try
{
// Verify the request hasn't been cancelled
cancellationToken.ThrowIfCancellationRequested();
// Mark that result has been read
hasBeenRead = true;
StorageDataReader dataReader = new StorageDataReader(dbDataReader);
// Open a writer for the file
var fileWriter = fileStreamFactory.GetWriter(outputFileName);
using (fileWriter)
{
// If we can initialize the columns using the column schema, use that
if (!dataReader.DbDataReader.CanGetColumnSchema())
{
throw new InvalidOperationException(SR.QueryServiceResultSetNoColumnSchema);
}
Columns = dataReader.Columns;
while (await dataReader.ReadAsync(cancellationToken))
{
fileOffsets.Add(totalBytesWritten);
totalBytesWritten += fileWriter.WriteRow(dataReader);
}
}
// Check if resultset is 'for xml/json'. If it is, set isJson/isXml value in column metadata
SingleColumnXmlJsonResultSet();
CheckForIsJson();
}
finally
{
// Fire off a result set completion event if we have one
if (ResultCompletion != null)
{
await ResultCompletion(this);
}
}
}
///
/// Removes a row from the result set cache
///
/// Internal ID of the row
public void RemoveRow(long internalId)
{
// Make sure that the results have been read
if (!hasBeenRead)
{
throw new InvalidOperationException(SR.QueryServiceResultSetNotRead);
}
// Simply remove the row from the list of row offsets
fileOffsets.RemoveAt(internalId);
}
///
/// Adds a new row to the result set by reading the row from the provided db data reader
///
/// The result of a command to insert a new row should be UNREAD
public async Task AddRow(DbDataReader dbDataReader)
{
// Write the new row to the end of the file
long newOffset = await AppendRowToBuffer(dbDataReader);
// Add the row to file offset list
fileOffsets.Add(newOffset);
}
///
/// Updates the values in a row with the
///
///
///
///
public async Task UpdateRow(long rowId, DbDataReader dbDataReader)
{
// Write the updated row to the end of the file
long newOffset = await AppendRowToBuffer(dbDataReader);
// Update the file offset of the row in question
fileOffsets[rowId] = newOffset;
}
///
/// Saves the contents of this result set to a file using the IFileStreamFactory provided
///
/// Parameters for saving the results to a file
///
/// Factory for creating a stream reader/writer combo for writing results to disk
///
/// Handler for a successful write of all rows
/// Handler for unsuccessful write of all rows
public void SaveAs(SaveResultsRequestParams saveParams, IFileStreamFactory fileFactory,
SaveAsAsyncEventHandler successHandler, SaveAsFailureAsyncEventHandler failureHandler)
{
// Sanity check the save params and file factory
Validate.IsNotNull(nameof(saveParams), saveParams);
Validate.IsNotNull(nameof(fileFactory), fileFactory);
// Make sure the resultset has finished being read
if (!hasBeenRead)
{
throw new InvalidOperationException(SR.QueryServiceSaveAsResultSetNotComplete);
}
// Make sure there isn't a task for this file already
Task existingTask;
if (SaveTasks.TryGetValue(saveParams.FilePath, out existingTask))
{
if (existingTask.IsCompleted)
{
// The task has completed, so let's attempt to remove it
if (!SaveTasks.TryRemove(saveParams.FilePath, out existingTask))
{
throw new InvalidOperationException(SR.QueryServiceSaveAsMiscStartingError);
}
}
else
{
// The task hasn't completed, so we shouldn't continue
throw new InvalidOperationException(SR.QueryServiceSaveAsInProgress);
}
}
// Create the new task
Task saveAsTask = new Task(async () =>
{
try
{
// Set row counts depending on whether save request is for entire set or a subset
long rowEndIndex = RowCount;
int rowStartIndex = 0;
if (saveParams.IsSaveSelection)
{
// ReSharper disable PossibleInvalidOperationException IsSaveSelection verifies these values exist
rowEndIndex = saveParams.RowEndIndex.Value + 1;
rowStartIndex = saveParams.RowStartIndex.Value;
// ReSharper restore PossibleInvalidOperationException
}
using (var fileReader = fileFactory.GetReader(outputFileName))
using (var fileWriter = fileFactory.GetWriter(saveParams.FilePath))
{
// Iterate over the rows that are in the selected row set
for (long i = rowStartIndex; i < rowEndIndex; ++i)
{
var row = fileReader.ReadRow(fileOffsets[i], i, Columns);
fileWriter.WriteRow(row, Columns);
}
if (successHandler != null)
{
await successHandler(saveParams);
}
}
}
catch (Exception e)
{
fileFactory.DisposeFile(saveParams.FilePath);
if (failureHandler != null)
{
await failureHandler(saveParams, e.Message);
}
}
});
// Add exception handling to the save task
Task taskWithHandling = saveAsTask.ContinueWithOnFaulted(async t =>
{
if (failureHandler != null)
{
await failureHandler(saveParams, t.Exception.Message);
}
});
// If saving the task fails, return a failure
if (!SaveTasks.TryAdd(saveParams.FilePath, taskWithHandling))
{
throw new InvalidOperationException(SR.QueryServiceSaveAsMiscStartingError);
}
// Task was saved, so start up the task
saveAsTask.Start();
}
#endregion
#region IDisposable Implementation
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposed)
{
return;
}
// Check if saveTasks are running for this ResultSet
if (!SaveTasks.IsEmpty)
{
// Wait for tasks to finish before disposing ResultSet
Task.WhenAll(SaveTasks.Values.ToArray()).ContinueWith(antecedent =>
{
if (disposing)
{
fileStreamFactory.DisposeFile(outputFileName);
}
disposed = true;
});
}
else
{
// If saveTasks is empty, continue with dispose
if (disposing)
{
fileStreamFactory.DisposeFile(outputFileName);
}
disposed = true;
}
}
#endregion
#region Private Helper Methods
///
/// If the result set represented by this class corresponds to a single XML
/// column that contains results of "for xml" query, set isXml = true
/// If the result set represented by this class corresponds to a single JSON
/// column that contains results of "for json" query, set isJson = true
///
private void SingleColumnXmlJsonResultSet()
{
if (Columns?.Length == 1 && RowCount != 0)
{
if (Columns[0].ColumnName.Equals(NameOfForXmlColumn, StringComparison.Ordinal))
{
Columns[0].IsXml = true;
isSingleColumnXmlJsonResultSet = true;
rowCountOverride = 1;
}
else if (Columns[0].ColumnName.Equals(NameOfForJsonColumn, StringComparison.Ordinal))
{
Columns[0].IsJson = true;
isSingleColumnXmlJsonResultSet = true;
rowCountOverride = 1;
}
}
}
///
/// Check columns for json type and set isJson if needed
///
private void CheckForIsJson()
{
if (Columns?.Length > 0 && RowCount != 0)
{
Regex regex = new Regex(@"({.*?})");
var row = GetRow(0);
for (int i = 0; i < Columns.Length; i++)
{
if (Columns[i].DataTypeName.Equals("nvarchar"))
{
if (regex.IsMatch(row[i].DisplayValue))
{
Columns[i].IsJson = true;
}
}
}
}
}
///
/// Determine the special action, if any, for this result set
///
private SpecialAction ProcessSpecialAction()
{
// Check if this result set is a showplan
if (Columns.Length == 1 && string.Compare(Columns[0].ColumnName, YukonXmlShowPlanColumn, StringComparison.OrdinalIgnoreCase) == 0)
{
specialAction.ExpectYukonXMLShowPlan = true;
}
return specialAction;
}
///
/// Adds a single row to the end of the buffer file. INTENDED FOR SINGLE ROW INSERTION ONLY.
///
/// An UNREAD db data reader
/// The offset into the file where the row was inserted
private async Task AppendRowToBuffer(DbDataReader dbDataReader)
{
Validate.IsNotNull(nameof(dbDataReader), dbDataReader);
if (!hasBeenRead)
{
throw new InvalidOperationException(SR.QueryServiceResultSetNotRead);
}
// NOTE: We are no longer checking to see if the data reader has rows before reading
// b/c of a quirk in SqlClient. In some scenarios, a SqlException isn't thrown until we
// read. In order to get appropriate errors back to the user, we'll read first.
// Returning false from .ReadAsync means there aren't any rows.
// Create a storage data reader, read it, make sure there were results
StorageDataReader dataReader = new StorageDataReader(dbDataReader);
if (!await dataReader.ReadAsync(CancellationToken.None))
{
throw new InvalidOperationException(SR.QueryServiceResultSetAddNoRows);
}
using (IFileStreamWriter writer = fileStreamFactory.GetWriter(outputFileName))
{
// Write the row to the end of the file
long currentFileOffset = totalBytesWritten;
writer.Seek(currentFileOffset);
totalBytesWritten += writer.WriteRow(dataReader);
return currentFileOffset;
}
}
#endregion
}
}