//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.EditData.Contracts;
using Microsoft.SqlTools.ServiceLayer.EditData.UpdateManagement;
using Microsoft.SqlTools.ServiceLayer.QueryExecution;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
using Microsoft.SqlTools.ServiceLayer.Utility;
using Microsoft.SqlTools.Utility;
namespace Microsoft.SqlTools.ServiceLayer.EditData
{
///
/// Represents an edit "session" bound to the results of a query, containing a cache of edits
/// that are pending. Provides logic for performing edit operations.
///
public class EditSession
{
private ResultSet associatedResultSet;
private readonly IEditMetadataFactory metadataFactory;
private EditTableMetadata objectMetadata;
///
/// Constructs a new edit session bound to the result set and metadat object provided
///
/// Factory for creating metadata
public EditSession(IEditMetadataFactory metaFactory)
{
Validate.IsNotNull(nameof(metaFactory), metaFactory);
// Setup the internal state
metadataFactory = metaFactory;
}
#region Properties
public delegate Task Connector();
public delegate Task QueryRunner(string query);
///
/// The task that is running to commit the changes to the db
/// Internal for unit test purposes.
///
internal Task CommitTask { get; set; }
///
/// The internal ID for the next row in the table. Internal for unit testing purposes only.
///
internal long NextRowId { get; private set; }
///
/// The cache of pending updates. Internal for unit test purposes only
///
internal ConcurrentDictionary EditCache { get; private set; }
///
/// The task that is running to initialize the edit session
///
internal Task InitializeTask { get; set; }
///
/// Whether or not the session has been initialized
///
public bool IsInitialized { get; internal set; }
#endregion
#region Public Methods
///
/// Initializes the edit session, asynchronously, by retrieving metadata about the table to
/// edit and querying the table for the rows of the table.
///
/// Parameters for initializing the edit session
/// Delegate that will return a DbConnection when executed
///
/// Delegate that will run the requested query and return a
/// object on execution
///
/// Func to call when initialization has completed successfully
/// Func to call when initialization has completed with errors
///
/// When session is already initialized or in progress of initializing
///
public void Initialize(EditInitializeParams initParams, Connector connector, QueryRunner queryRunner, Func successHandler, Func errorHandler)
{
if (IsInitialized)
{
throw new InvalidOperationException(SR.EditDataSessionAlreadyInitialized);
}
if (InitializeTask != null)
{
throw new InvalidOperationException(SR.EditDataSessionAlreadyInitializing);
}
Validate.IsNotNullOrWhitespaceString(nameof(initParams.ObjectName), initParams.ObjectName);
Validate.IsNotNullOrWhitespaceString(nameof(initParams.ObjectType), initParams.ObjectType);
Validate.IsNotNull(nameof(initParams.Filters), initParams.Filters);
Validate.IsNotNull(nameof(connector), connector);
Validate.IsNotNull(nameof(queryRunner), queryRunner);
Validate.IsNotNull(nameof(successHandler), successHandler);
Validate.IsNotNull(nameof(errorHandler), errorHandler);
// Start up the initialize process
InitializeTask = InitializeInternal(initParams, connector, queryRunner, successHandler, errorHandler);
}
///
/// Validates that a query can be used for an edit session. The target result set is returned
///
/// The query to validate
/// The result set to use
public static ResultSet ValidateQueryForSession(Query query)
{
Validate.IsNotNull(nameof(query), query);
// Determine if the query is valid for editing
// Criterion 1) Query has finished executing
if (!query.HasExecuted)
{
throw new InvalidOperationException(SR.EditDataQueryNotCompleted);
}
// Criterion 2) Query only has a single result set
ResultSet[] queryResultSets = query.Batches.SelectMany(b => b.ResultSets).ToArray();
if (queryResultSets.Length != 1)
{
throw new InvalidOperationException(SR.EditDataQueryImproperResultSets);
}
return query.Batches[0].ResultSets[0];
}
///
/// Creates a new row update and adds it to the update cache
///
/// If inserting into cache fails
/// The internal ID of the newly created row
public EditCreateRowResult CreateRow()
{
ThrowIfNotInitialized();
// Create a new row ID (atomically, since this could be accesses concurrently)
long newRowId = NextRowId++;
// Create a new row create update and add to the update cache
RowCreate newRow = new RowCreate(newRowId, associatedResultSet, objectMetadata);
if (!EditCache.TryAdd(newRowId, newRow))
{
// Revert the next row ID
NextRowId--;
throw new InvalidOperationException(SR.EditDataFailedAddRow);
}
// Set the default values of the row if we know them
string[] defaultValues = new string[objectMetadata.Columns.Length];
for(int i = 0; i < objectMetadata.Columns.Length; i++)
{
EditColumnMetadata col = objectMetadata.Columns[i];
// If the column is calculated, return the calculated placeholder as the display value
if (col.IsCalculated.HasTrue())
{
defaultValues[i] = SR.EditDataComputedColumnPlaceholder;
}
else
{
if (col.DefaultValue != null)
{
newRow.SetCell(i, col.DefaultValue);
}
defaultValues[i] = col.DefaultValue;
}
}
EditCreateRowResult output = new EditCreateRowResult
{
NewRowId = newRowId,
DefaultValues = defaultValues
};
return output;
}
///
/// Commits the edits in the cache to the database and then to the associated result set of
/// this edit session. This is launched asynchronously.
///
/// The connection to use for executing the query
/// Callback to perform when the commit process has finished
/// Callback to perform if the commit process has failed at some point
public void CommitEdits(DbConnection connection, Func successHandler, Func errorHandler)
{
ThrowIfNotInitialized();
Validate.IsNotNull(nameof(connection), connection);
Validate.IsNotNull(nameof(successHandler), successHandler);
Validate.IsNotNull(nameof(errorHandler), errorHandler);
// Make sure that there isn't a commit task in progress
if (CommitTask != null && !CommitTask.IsCompleted)
{
throw new InvalidOperationException(SR.EditDataCommitInProgress);
}
// Start up the commit process
CommitTask = CommitEditsInternal(connection, successHandler, errorHandler);
}
///
/// Creates a delete row update and adds it to the update cache
///
///
/// If row requested to delete already has a pending change in the cache
///
/// The internal ID of the row to delete
public void DeleteRow(long rowId)
{
ThrowIfNotInitialized();
// Sanity check the row ID
if (rowId >= NextRowId || rowId < 0)
{
throw new ArgumentOutOfRangeException(nameof(rowId), SR.EditDataRowOutOfRange);
}
// Create a new row delete update and add to cache
RowDelete deleteRow = new RowDelete(rowId, associatedResultSet, objectMetadata);
if (!EditCache.TryAdd(rowId, deleteRow))
{
throw new InvalidOperationException(SR.EditDataUpdatePending);
}
}
///
/// Retrieves a subset of rows with the pending updates applied. If more rows than exist
/// are requested, only the rows that exist will be returned.
///
/// Index to start returning rows from
/// The number of rows to return.
/// An array of rows with pending edits applied
public async Task GetRows(long startIndex, int rowCount)
{
ThrowIfNotInitialized();
// Get the cached rows from the result set
ResultSetSubset cachedRows = startIndex < associatedResultSet.RowCount
? await associatedResultSet.GetSubset(startIndex, rowCount)
: new ResultSetSubset
{
RowCount = 0,
Rows = new DbCellValue[][] { }
};
// Convert the rows into EditRows and apply the changes we have
List editRows = new List();
for (int i = 0; i < cachedRows.RowCount; i++)
{
long rowId = i + startIndex;
RowEditBase edr;
if (EditCache.TryGetValue(rowId, out edr))
{
// Ask the edit object to generate an edit row
editRows.Add(edr.GetEditRow(cachedRows.Rows[i]));
}
else
{
// Package up the existing row into a clean edit row
EditRow er = new EditRow
{
Id = rowId,
Cells = cachedRows.Rows[i].Select(cell => new EditCell(cell, false)).ToArray(),
State = EditRow.EditRowState.Clean
};
editRows.Add(er);
}
}
// If the requested range of rows was at the end of the original cell set and we have
// added new rows, we need to reflect those changes
if (rowCount > cachedRows.RowCount)
{
long endIndex = startIndex + cachedRows.RowCount;
var newRows = EditCache.Where(edit => edit.Key >= endIndex).Take(rowCount - cachedRows.RowCount);
editRows.AddRange(newRows.Select(newRow => newRow.Value.GetEditRow(null)));
}
return editRows.ToArray();
}
///
/// Reverts a cell in a pending edit
///
/// Internal ID of the row to have its edits reverted
/// Ordinal ID of the column to revert
/// String version of the old value for the cell
public EditRevertCellResult RevertCell(long rowId, int columnId)
{
ThrowIfNotInitialized();
// Attempt to get the row edit with the given ID
RowEditBase pendingEdit;
if (!EditCache.TryGetValue(rowId, out pendingEdit))
{
throw new ArgumentOutOfRangeException(nameof(rowId), SR.EditDataUpdateNotPending);
}
// Update the row
EditRevertCellResult revertResult = pendingEdit.RevertCell(columnId);
CleanupEditIfRowClean(rowId, revertResult);
// Have the edit base revert the cell
return revertResult;
}
///
/// Removes a pending row update from the update cache.
///
///
/// If a pending row update with the given row ID does not exist.
///
/// The internal ID of the row to reset
public void RevertRow(long rowId)
{
ThrowIfNotInitialized();
// Attempt to remove the row with the given ID
RowEditBase removedEdit;
if (!EditCache.TryRemove(rowId, out removedEdit))
{
throw new ArgumentOutOfRangeException(nameof(rowId), SR.EditDataUpdateNotPending);
}
}
///
/// Generates a single script file with all the pending edits scripted.
///
/// The path to output the script to
///
public string ScriptEdits(string outputPath)
{
ThrowIfNotInitialized();
// Validate the output path
// @TODO: Reinstate this code once we have an interface around file generation
//if (outputPath == null)
//{
// // If output path isn't provided, we'll use a temporary location
// outputPath = Path.GetTempFileName();
//}
//else
if (outputPath == null || outputPath.Trim() == string.Empty)
{
// If output path is empty, that's an error
throw new ArgumentNullException(nameof(outputPath), SR.EditDataScriptFilePathNull);
}
// Open a handle to the output file
using (FileStream outputStream = File.OpenWrite(outputPath))
using (TextWriter outputWriter = new StreamWriter(outputStream))
{
// Convert each update in the cache into an insert/update/delete statement
foreach (RowEditBase rowEdit in EditCache.Values)
{
outputWriter.WriteLine(rowEdit.GetScript());
}
}
// Return the location of the generated script
return outputPath;
}
///
/// Performs an update to a specific cell in a row. If the row has not already been
/// initialized with a record in the update cache, one is created.
///
/// If adding a new update row fails
///
/// If the row that is requested to be edited is beyond the rows in the results and the
/// rows that are being added.
///
/// The internal ID of the row to edit
/// The ordinal of the column to edit in the row
/// The new string value of the cell to update
public EditUpdateCellResult UpdateCell(long rowId, int columnId, string newValue)
{
ThrowIfNotInitialized();
// Sanity check to make sure that the row ID is in the range of possible values
if (rowId >= NextRowId || rowId < 0)
{
throw new ArgumentOutOfRangeException(nameof(rowId), SR.EditDataRowOutOfRange);
}
// Attempt to get the row that is being edited, create a new update object if one
// doesn't exist
// NOTE: This *must* be done as a lambda. RowUpdate creation requires that the row
// exist in the result set. We only want a new RowUpdate to be created if the edit
// doesn't already exist in the cache
RowEditBase editRow = EditCache.GetOrAdd(rowId, key => new RowUpdate(rowId, associatedResultSet, objectMetadata));
// Update the row
EditUpdateCellResult result = editRow.SetCell(columnId, newValue);
CleanupEditIfRowClean(rowId, result);
return result;
}
#endregion
#region Private Helpers
private async Task InitializeInternal(EditInitializeParams initParams, Connector connector,
QueryRunner queryRunner, Func successHandler, Func failureHandler)
{
try
{
// Step 1) Look up the SMO metadata
string[] namedParts = SqlScriptFormatter.DecodeMultipartIdenfitier(initParams.ObjectName);
objectMetadata = metadataFactory.GetObjectMetadata(await connector(), namedParts,
initParams.ObjectType);
// Step 2) Get and execute a query for the rows in the object we're looking up
EditSessionQueryExecutionState state = await queryRunner(ConstructInitializeQuery(objectMetadata, initParams.Filters));
if (state.Query == null)
{
string message = state.Message ?? SR.EditDataQueryFailed;
throw new Exception(message);
}
// Step 3) Setup the internal state
associatedResultSet = ValidateQueryForSession(state.Query);
NextRowId = associatedResultSet.RowCount;
EditCache = new ConcurrentDictionary();
IsInitialized = true;
objectMetadata.Extend(associatedResultSet.Columns);
// Step 4) Return our success
await successHandler();
}
catch (Exception e)
{
await failureHandler(e);
}
}
private async Task CommitEditsInternal(DbConnection connection, Func successHandler, Func errorHandler)
{
try
{
// @TODO: Add support for transactional commits
// Trust the RowEdit to sort itself appropriately
var editOperations = EditCache.Values.ToList();
editOperations.Sort();
foreach (var editOperation in editOperations)
{
// Get the command from the edit operation and execute it
using (DbCommand editCommand = editOperation.GetCommand(connection))
using (DbDataReader reader = await editCommand.ExecuteReaderAsync())
{
// Apply the changes of the command to the result set
await editOperation.ApplyChanges(reader);
}
// If we succeeded in applying the changes, then remove this from the cache
// @TODO: Prevent edit sessions from being modified while a commit is in progress
RowEditBase re;
EditCache.TryRemove(editOperation.RowId, out re);
}
await successHandler();
}
catch (Exception e)
{
await errorHandler(e);
}
}
///
/// Constructs a query for selecting rows in a table based on the filters provided.
/// Internal for unit testing purposes only.
///
internal static string ConstructInitializeQuery(EditTableMetadata metadata, EditInitializeFiltering initFilters)
{
StringBuilder queryBuilder = new StringBuilder("SELECT ");
// If there is a filter for top n rows, then apply it
if (initFilters.LimitResults.HasValue)
{
if (initFilters.LimitResults < 0)
{
throw new ArgumentOutOfRangeException(nameof(initFilters.LimitResults), SR.EditDataFilteringNegativeLimit);
}
queryBuilder.AppendFormat("TOP {0} ", initFilters.LimitResults.Value);
}
// Using the columns we know, add them to the query
var columns = metadata.Columns.Select(col => col.EscapedName);
var columnClause = string.Join(", ", columns);
queryBuilder.Append(columnClause);
// Add the FROM
queryBuilder.AppendFormat(" FROM {0}", metadata.EscapedMultipartName);
return queryBuilder.ToString();
}
private void ThrowIfNotInitialized()
{
if (!IsInitialized)
{
throw new InvalidOperationException(SR.EditDataSessionNotInitialized);
}
}
///
/// Removes the edit from the edit cache if the row is no longer dirty
///
/// ID of the update to cleanup
/// Result with row dirty flag
private void CleanupEditIfRowClean(long rowId, EditCellResult editCellResult)
{
// If the row is still dirty, don't do anything
if (editCellResult.IsRowDirty)
{
return;
}
// Make an attempt to remove the clean row edit. If this fails, it'll be handled on commit attempt.
RowEditBase removedRow;
EditCache.TryRemove(rowId, out removedRow);
}
#endregion
///
/// State object to return upon completion of an edit session intialization query
///
public class EditSessionQueryExecutionState
{
///
/// The query object that was used to execute the edit initialization query. If
/// null the query was not successfully executed.
///
public Query Query { get; set; }
///
/// Any message that may have occurred during execution of the query (ie, exceptions).
/// If this is and are null then the error messages were
/// returned via message events.
///
public string Message { get; set; }
///
/// Constructs a new instance. Sets the values of the properties.
///
public EditSessionQueryExecutionState(Query query, string message = null)
{
Query = query;
Message = message;
}
}
}
}