//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
#nullable disable
using Microsoft.SqlServer.DataCollection.Common.Contracts.Advisor;
using Microsoft.SqlServer.DataCollection.Common.Contracts.ErrorHandling;
using Microsoft.SqlServer.DataCollection.Common.Contracts.SqlQueries;
using Microsoft.SqlServer.DataCollection.Common.ErrorHandling;
using Microsoft.SqlServer.Migration.SkuRecommendation;
using Microsoft.SqlServer.Migration.SkuRecommendation.Contracts.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace Microsoft.SqlTools.Migration
{
///
/// Controller to manage the collection, aggregation, and persistence of SQL performance and static data for SKU recommendation.
///
public class SqlDataQueryController : IDisposable
{
// Timers to control performance and static data collection intervals
private IList timers = new List() { };
private int perfQueryIntervalInSec;
private int numberOfIterations;
// Output folder to store data in
private string outputFolder;
// Name of the server handled by this controller
private string serverName;
// Data collector and cache
private DataPointsCollector dataCollector = null;
private SqlPerfDataPointsCache perfDataCache = null;
// Whether or not this controller has been disposed
private bool disposedValue = false;
private ISqlAssessmentLogger _logger;
// since this "console app" doesn't have any console to write to, store any messages so that they can be periodically fetched
private List> messages;
private List> errors;
///
/// Create a new SqlDataQueryController.
///
/// SQL connection string
/// Output folder to save results to
/// Interval, in seconds, at which perf counters are collected
/// Number of iterations of perf counter collection before aggreagtion
/// Interval, in seconds, at which static/common counters are colltected
/// Logger
public SqlDataQueryController(
string connectionString,
string outputFolder,
int perfQueryIntervalInSec,
int numberOfIterations,
int staticQueryIntervalInSec,
ISqlAssessmentLogger logger = null)
{
this.outputFolder = outputFolder;
this.perfQueryIntervalInSec = perfQueryIntervalInSec;
this.numberOfIterations = numberOfIterations;
this._logger = logger ?? new DefaultPerfDataCollectionLogger();
this.messages = new List>();
this.errors = new List>();
perfDataCache = new SqlPerfDataPointsCache(this.outputFolder, _logger);
dataCollector = new DataPointsCollector(new string[] { connectionString }, _logger);
// set up timers to run perf/static collection at specified intervals
System.Timers.Timer perfDataCollectionTimer = new System.Timers.Timer();
perfDataCollectionTimer.Elapsed += (sender, e) => PerfDataQueryEvent();
perfDataCollectionTimer.Interval = perfQueryIntervalInSec * 1000;
timers.Add(perfDataCollectionTimer);
System.Timers.Timer staticDataCollectionTimer = new System.Timers.Timer();
staticDataCollectionTimer.Elapsed += (sender, e) => StaticDataQueryAndPersistEvent();
staticDataCollectionTimer.Interval = staticQueryIntervalInSec * 1000;
timers.Add(staticDataCollectionTimer);
}
///
/// Start this SqlDataQueryController.
///
public void Start()
{
foreach (var timer in timers)
{
timer.Start();
}
}
///
/// Returns whether or not this SqlDataQueryController is currently running.
///
public bool IsRunning()
{
return this.timers.All(timer => timer.Enabled);
}
///
/// Collect performance data, adding the collected points to the cache.
///
private void PerfDataQueryEvent()
{
try
{
int currentIteration = perfDataCache.CurrentIteration;
// Get raw perf data points
var validationResult = dataCollector.CollectPerfDataPoints(CancellationToken.None, TimeSpan.FromSeconds(this.perfQueryIntervalInSec)).Result.FirstOrDefault();
if (validationResult != null && validationResult.Status == SqlAssessmentStatus.Completed)
{
IList result = validationResult.SqlPerfDataPoints;
perfDataCache.AddingPerfData(result);
serverName = this.perfDataCache.ServerName;
this.messages.Add(new KeyValuePair(
string.Format("Performance data query iteration: {0} of {1}, collected {2} data points.", currentIteration, numberOfIterations, result.Count),
DateTime.UtcNow));
// perform aggregation and persistence once enough iterations have completed
if (currentIteration == numberOfIterations)
{
PerfDataAggregateAndPersistEvent();
}
}
else if (validationResult != null && validationResult.Status == SqlAssessmentStatus.Error)
{
var error = validationResult.Errors.FirstOrDefault();
Logging(error);
}
}
catch (Exception e)
{
Logging(e);
}
}
///
/// Aggregate and persist the cached points, saving the aggregated points to disk.
///
internal void PerfDataAggregateAndPersistEvent()
{
try
{
// Aggregate the records in the Cache
int rawDataPointsCount = this.perfDataCache.GetRawDataPointsCount();
this.perfDataCache.AggregatingPerfData();
int aggregatedDataPointsCount = this.perfDataCache.GetAggregatedDataPointsCount();
// Persist into local csv.
if (aggregatedDataPointsCount > 0)
{
this.perfDataCache.PersistingCacheAsCsv();
this.messages.Add(new KeyValuePair(
string.Format("Aggregated {0} raw data points to {1} performance counters, and saved to {2}.", rawDataPointsCount, aggregatedDataPointsCount, this.outputFolder),
DateTime.UtcNow));
}
}
catch (Exception e)
{
Logging(e);
}
}
///
/// Collect and persist static data, saving the collected points to disk.
///
private void StaticDataQueryAndPersistEvent()
{
try
{
var validationResult = this.dataCollector.CollectCommonDataPoints(CancellationToken.None).Result.FirstOrDefault();
if (validationResult != null && validationResult.Status == SqlAssessmentStatus.Completed)
{
// Common data result
IList staticDataResult = new List();
staticDataResult.Add(validationResult.SqlCommonDataPoints);
serverName = staticDataResult.Select(p => p.ServerName).FirstOrDefault();
// Save to csv
var persistor = new DataPointsPersistor(this.outputFolder);
serverName = staticDataResult.Select(p => p.ServerName).FirstOrDefault();
persistor.SaveCommonDataPoints(staticDataResult, serverName);
this.messages.Add(new KeyValuePair(
string.Format("Collected static configuration data, and saved to {0}.", this.outputFolder),
DateTime.UtcNow));
}
else if (validationResult != null && validationResult.Status == SqlAssessmentStatus.Error)
{
var error = validationResult.Errors.FirstOrDefault();
Logging(error);
}
}
catch (Exception e)
{
Logging(e);
}
}
///
/// Log exceptions to file.
///
/// Exception to log
private void Logging(Exception ex)
{
this.errors.Add(new KeyValuePair(ex.Message, DateTime.UtcNow));
var error = new UnhandledSqlExceptionErrorModel(ex, ErrorScope.General);
_logger.Log(error, ErrorLevel.Error, TelemetryScope.PerfCollection);
_logger.Log(TelemetryScope.PerfCollection, ex.Message);
}
///
/// Log errors to file.
///
/// Error to log
private void Logging(IErrorModel error)
{
this.errors.Add(new KeyValuePair(error.RawException.Message, DateTime.UtcNow));
_logger.Log(error, ErrorLevel.Error, TelemetryScope.PerfCollection);
_logger.Log(TelemetryScope.PerfCollection, error.RawException.Message);
}
///
/// Fetches the latest messages, and then clears the message list.
///
/// Only return messages from after this time
/// List of queued messages
public List FetchLatestMessages(DateTime startTime)
{
List latestMessages = this.messages.Where(kvp => kvp.Value > startTime).Select(kvp => kvp.Key).ToList();
this.messages.Clear();
return latestMessages;
}
///
/// Fetches the latest messages, and then clears the message list.
///
/// Only return messages from after this time
/// List of queued errors
public List FetchLatestErrors(DateTime startTime)
{
List latestErrors = this.errors.Where(kvp => kvp.Value > startTime).Select(kvp => kvp.Key).ToList();
this.messages.Clear();
return latestErrors;
}
///
/// Dispose of this SqlDataQueryController.
///
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
foreach (var timer in timers)
{
timer.Stop();
}
if (perfDataCache.CurrentIteration > 2)
{
PerfDataAggregateAndPersistEvent(); // flush cache if there are enough data points
}
this.perfDataCache = null;
}
disposedValue = true;
}
}
}
///
/// Cache to store intermediate SQL performance data before it is aggregated and persisted for SKU recommendation.
///
public class SqlPerfDataPointsCache
{
public string ServerName { get; private set; }
public int CurrentIteration { get; private set; }
private string outputFolder;
private ISqlAssessmentLogger logger;
private IList> perfDataPoints = new List>();
private IList perfAggregated = new List();
///
/// Create a new SqlPerfDataPointsCache.
///
/// Output folder to save results to
/// Logger
public SqlPerfDataPointsCache(string outputFolder, ISqlAssessmentLogger logger = null)
{
this.outputFolder = outputFolder;
this.logger = logger ?? new DefaultPerfDataCollectionLogger();
CurrentIteration = 1;
}
///
/// Add the collected data points to the cache.
///
/// Collected data points
public void AddingPerfData(IList result)
{
ServerName = result.Select(p => p.ServerName).FirstOrDefault();
perfDataPoints.Add(result);
CurrentIteration++;
}
///
/// Return the number of raw data points.
///
public int GetRawDataPointsCount()
{
// flatten list
return perfDataPoints.SelectMany(x => x).Count();
}
///
/// Return the number of aggregated data points.
///
public int GetAggregatedDataPointsCount()
{
return perfAggregated.Count;
}
///
/// Aggregate the cached data points.
///
public void AggregatingPerfData()
{
try
{
var aggregator = new CounterAggregator(logger);
perfAggregated = aggregator.AggregateDatapoints(perfDataPoints);
}
catch (Exception ex)
{
throw ex;
}
finally
{
perfDataPoints.Clear();
// reset the iteration counter
CurrentIteration = 1;
}
}
///
/// Save the cached and aggregated data points to disk.
///
public void PersistingCacheAsCsv()
{
// Save to csv
var persistor = new DataPointsPersistor(outputFolder);
persistor.SavePerfDataPoints(perfAggregated, machineId: ServerName, overwrite: false);
}
}
}