using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Kusto.Cloud.Platform.Data;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Data;
using Kusto.Data.Exceptions;
using Kusto.Data.Net.Client;
using Kusto.Language;
using Kusto.Language.Editor;
using Microsoft.Data.SqlClient;
using Microsoft.Kusto.ServiceLayer.Connection;
using Microsoft.Kusto.ServiceLayer.DataSource.DataSourceIntellisense;
using Microsoft.Kusto.ServiceLayer.Utility;
namespace Microsoft.Kusto.ServiceLayer.DataSource
{
public class KustoClient : IKustoClient
{
private readonly string _ownerUri;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private ICslAdminProvider _kustoAdminProvider;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private ICslQueryProvider _kustoQueryProvider;
///
/// SchemaState used for getting intellisense info.
///
public GlobalState SchemaState { get; private set; }
public string ClusterName { get; }
public string DatabaseName { get; private set; }
public KustoClient(string connectionString, string azureAccountToken, string ownerUri)
{
_ownerUri = ownerUri;
ClusterName = GetClusterName(connectionString);
var databaseName = new SqlConnectionStringBuilder(connectionString).InitialCatalog;
Initialize(ClusterName, databaseName, azureAccountToken);
DatabaseName = string.IsNullOrWhiteSpace(databaseName) ? GetFirstDatabaseName() : databaseName;
SchemaState = LoadSchemaState();
}
private GlobalState LoadSchemaState()
{
CancellationTokenSource source = new CancellationTokenSource();
IEnumerable tableSchemas = Enumerable.Empty();
IEnumerable functionSchemas = Enumerable.Empty();
Parallel.Invoke(() =>
{
tableSchemas = ExecuteControlCommandAsync(
$".show database {DatabaseName} schema",
false, source.Token).Result;
}, () =>
{
functionSchemas = ExecuteControlCommandAsync(".show functions", false,
source.Token).Result;
});
return KustoIntellisenseHelper.AddOrUpdateDatabase(tableSchemas, functionSchemas,
GlobalState.Default,
DatabaseName, ClusterName);
}
private void Initialize(string clusterName, string databaseName, string azureAccountToken)
{
var stringBuilder = GetKustoConnectionStringBuilder(clusterName, databaseName, azureAccountToken, "", "");
_kustoQueryProvider = KustoClientFactory.CreateCslQueryProvider(stringBuilder);
_kustoAdminProvider = KustoClientFactory.CreateCslAdminProvider(stringBuilder);
}
private void RefreshAzureToken()
{
string azureAccountToken = ConnectionService.Instance.RefreshAzureToken(_ownerUri);
_kustoQueryProvider.Dispose();
_kustoAdminProvider.Dispose();
Initialize(ClusterName, DatabaseName, azureAccountToken);
}
///
/// Extracts the cluster name from the connectionstring. The string looks like the following:
/// "Data Source=clustername.kusto.windows.net;User ID=;Password=;Pooling=False;Application Name=azdata-GeneralConnection"
///
/// A connection string coming over the Data management protocol
private string GetClusterName(string connectionString)
{
var csb = new SqlConnectionStringBuilder(connectionString);
// If there is no https:// prefix, add it
Uri uri;
if ((Uri.TryCreate(csb.DataSource, UriKind.Absolute, out uri) ||
Uri.TryCreate("https://" + csb.DataSource, UriKind.Absolute, out uri)) &&
(uri.Scheme == Uri.UriSchemeHttp || uri.Scheme == Uri.UriSchemeHttps))
{
return uri.AbsoluteUri;
}
throw new ArgumentException("Expected a URL of the form clustername.kusto.windows.net");
}
private KustoConnectionStringBuilder GetKustoConnectionStringBuilder(string clusterName, string databaseName,
string userToken, string applicationClientId, string applicationKey)
{
ValidationUtils.IsNotNull(clusterName, nameof(clusterName));
ValidationUtils.IsTrue(
!string.IsNullOrWhiteSpace(userToken)
|| (!string.IsNullOrWhiteSpace(applicationClientId) && !string.IsNullOrWhiteSpace(applicationKey)),
$"the Kusto authentication is not specified - either set {nameof(userToken)}, or set {nameof(applicationClientId)} and {nameof(applicationKey)}");
var kcsb = new KustoConnectionStringBuilder
{
DataSource = clusterName,
// Perform federated auth based on the AAD user token, or based on the AAD application client id and key.
FederatedSecurity = true
};
if (!string.IsNullOrWhiteSpace(databaseName))
{
kcsb.InitialCatalog = databaseName;
}
if (!string.IsNullOrWhiteSpace(userToken))
{
kcsb.UserToken = userToken;
}
if (!string.IsNullOrWhiteSpace(applicationClientId))
{
kcsb.ApplicationClientId = applicationClientId;
}
if (!string.IsNullOrWhiteSpace(applicationKey))
{
kcsb.ApplicationKey = applicationKey;
}
return kcsb;
}
///
/// Extracts the database name from the connectionString if it exists
/// otherwise it takes the first database name from the server
///
///
/// Database Name
private string GetFirstDatabaseName()
{
var source = new CancellationTokenSource();
string query = ".show databases | project DatabaseName";
using (var reader = ExecuteQuery(query, source.Token))
{
var rows = reader.ToEnumerable();
var row = rows?.FirstOrDefault();
return row?[0].ToString() ?? string.Empty;
}
}
public IDataReader ExecuteQuery(string query, CancellationToken cancellationToken, string databaseName = null, int retryCount = 1)
{
ValidationUtils.IsArgumentNotNullOrWhiteSpace(query, nameof(query));
var clientRequestProperties = new ClientRequestProperties
{
ClientRequestId = Guid.NewGuid().ToString()
};
clientRequestProperties.SetOption(ClientRequestProperties.OptionNoTruncation, true);
cancellationToken.Register(() => CancelQuery(clientRequestProperties.ClientRequestId));
var script = CodeScript.From(query, GlobalState.Default);
IDataReader[] origReaders = new IDataReader[script.Blocks.Count];
try
{
Parallel.ForEach(script.Blocks, (codeBlock, state, index) =>
{
var minimalQuery =
codeBlock.Service.GetMinimalText(MinimalTextKind.RemoveLeadingWhitespaceAndComments);
IDataReader origReader = _kustoQueryProvider.ExecuteQuery(
KustoQueryUtils.IsClusterLevelQuery(minimalQuery) ? "" : databaseName,
minimalQuery,
clientRequestProperties);
origReaders[index] = origReader;
});
return new KustoResultsReader(origReaders);
}
catch (AggregateException exception)
when (retryCount > 0 &&
exception.InnerException is KustoRequestException innerException
&& innerException.FailureCode == 401) // Unauthorized
{
RefreshAzureToken();
retryCount--;
return ExecuteQuery(query, cancellationToken, databaseName, retryCount);
}
}
///
/// Executes a query or command against a kusto cluster and returns a sequence of result row instances.
///
public async Task> ExecuteControlCommandAsync(string command, bool throwOnError,
CancellationToken cancellationToken)
{
try
{
var resultReader = await ExecuteQueryAsync(command, cancellationToken, DatabaseName);
var results = KustoDataReaderParser.ParseV1(resultReader, null);
var tableReader = results[WellKnownDataSet.PrimaryResult].Single().TableData.CreateDataReader();
return new ObjectReader(tableReader);
}
catch (Exception) when (!throwOnError)
{
return null;
}
}
///
/// Executes a query.
///
/// The query.
/// The results.
public Task ExecuteQueryAsync(string query, CancellationToken cancellationToken,
string databaseName = null)
{
var reader = ExecuteQuery(query, cancellationToken, databaseName);
return Task.FromResult(reader);
}
private void CancelQuery(string clientRequestId)
{
var query = $".cancel query \"{clientRequestId}\"";
ExecuteControlCommand(query);
}
///
/// Executes a Kusto control command.
///
/// The command.
///
public void ExecuteControlCommand(string command, int retryCount = 1)
{
ValidationUtils.IsArgumentNotNullOrWhiteSpace(command, nameof(command));
try
{
using (var adminOutput = _kustoAdminProvider.ExecuteControlCommand(command, null))
{
}
}
catch (KustoRequestException exception) when (retryCount > 0 && exception.FailureCode == 401) // Unauthorized
{
RefreshAzureToken();
retryCount--;
ExecuteControlCommand(command, retryCount);
}
}
public void UpdateDatabase(string databaseName)
{
DatabaseName = databaseName;
SchemaState = LoadSchemaState();
}
public void Dispose()
{
_kustoQueryProvider.Dispose();
_kustoAdminProvider.Dispose();
}
}
}