Adding External Streaming Job I/O validation to DacFxService (#1106)

* checkpoint

* Not having cake, nor eating it

* Working

* Swapping external dll for nupkg

* Extracting statement out of full TSQL

* Improving error message

* Fixing filename capitalization

* Reverting csproj changes

* Adding updated sr.cs file

* VS lost tracking on strings file?

* PR feedback

* resx additions

* More updated string files

* Swapped nuget for dll

* Revert "Swapped nuget for dll"

This reverts commit 6013f3fadf58ebc7e3590a46811d9fd9fc3eaa4a.

* Bumped netcore version to pull in support for extern aliasing nugets
This commit is contained in:
Benjin Dubishar
2020-11-02 12:03:14 -08:00
committed by GitHub
parent d94e691b07
commit 65c4fc01aa
14 changed files with 431 additions and 9 deletions

View File

@@ -32,5 +32,6 @@
<PackageReference Update="xunit" Version="2.4.1" /> <PackageReference Update="xunit" Version="2.4.1" />
<PackageReference Update="xunit.runner.visualstudio" Version="2.4.1" /> <PackageReference Update="xunit.runner.visualstudio" Version="2.4.1" />
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="16.6.1" /> <PackageReference Update="Microsoft.NET.Test.Sdk" Version="16.6.1" />
<PackageReference Update="Microsoft.SqlServer.TransactSql.ScriptDom.NRT" Version="1.2.65626.134" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@@ -1,5 +1,5 @@
{ {
"sdk": { "sdk": {
"version": "3.1.302" "version": "3.1.403"
} }
} }

View File

@@ -0,0 +1,44 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System.Collections.Generic;
using Microsoft.SqlTools.Hosting.Protocol.Contracts;
using Microsoft.SqlTools.ServiceLayer.SchemaCompare.Contracts;
using Microsoft.SqlTools.ServiceLayer.Utility;
namespace Microsoft.SqlTools.ServiceLayer.DacFx.Contracts
{
/// <summary>
/// Parameters for a Validate Streaming Job request.
/// </summary>
public class ValidateStreamingJobParams
{
/// <summary>
/// Gets or sets the package file path
/// </summary>
public string PackageFilePath { get; set; }
/// <summary>
/// Gets or sets the create streaming job TSQL. Should not be used if Statement is set.
/// </summary>
public string CreateStreamingJobTsql { get; set;}
}
/// <summary>
/// Parameters returned from a DacFx validate streaming job request.
/// </summary>
public class ValidateStreamingJobResult : ResultStatus
{
}
/// <summary>
/// Defines the DacFx validate streaming job request type
/// </summary>
class ValidateStreamingJobRequest
{
public static readonly RequestType<ValidateStreamingJobParams, ValidateStreamingJobResult> Type =
RequestType<ValidateStreamingJobParams, ValidateStreamingJobResult>.Create("dacfx/validateStreamingJob");
}
}

View File

@@ -47,6 +47,7 @@ namespace Microsoft.SqlTools.ServiceLayer.DacFx
serviceHost.SetRequestHandler(GenerateDeployScriptRequest.Type, this.HandleGenerateDeployScriptRequest); serviceHost.SetRequestHandler(GenerateDeployScriptRequest.Type, this.HandleGenerateDeployScriptRequest);
serviceHost.SetRequestHandler(GenerateDeployPlanRequest.Type, this.HandleGenerateDeployPlanRequest); serviceHost.SetRequestHandler(GenerateDeployPlanRequest.Type, this.HandleGenerateDeployPlanRequest);
serviceHost.SetRequestHandler(GetOptionsFromProfileRequest.Type, this.HandleGetOptionsFromProfileRequest); serviceHost.SetRequestHandler(GetOptionsFromProfileRequest.Type, this.HandleGetOptionsFromProfileRequest);
serviceHost.SetRequestHandler(ValidateStreamingJobRequest.Type, this.HandleValidateStreamingJobRequest);
} }
/// <summary> /// <summary>
@@ -256,6 +257,25 @@ namespace Microsoft.SqlTools.ServiceLayer.DacFx
} }
} }
/// <summary>
/// Handles request to validate an ASA streaming job
/// </summary>
/// <returns></returns>
public async Task HandleValidateStreamingJobRequest(ValidateStreamingJobParams parameters, RequestContext<ValidateStreamingJobResult> requestContext)
{
try
{
ValidateStreamingJobOperation operation = new ValidateStreamingJobOperation(parameters);
ValidateStreamingJobResult result = operation.ValidateQuery();
await requestContext.SendResult(result);
}
catch (Exception e)
{
await requestContext.SendError(e);
}
}
private void ExecuteOperation(DacFxOperation operation, DacFxParams parameters, string taskName, RequestContext<DacFxResult> requestContext) private void ExecuteOperation(DacFxOperation operation, DacFxParams parameters, string taskName, RequestContext<DacFxResult> requestContext)
{ {
Task.Run(async () => Task.Run(async () =>

View File

@@ -0,0 +1,137 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
extern alias ASAScriptDom;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using Microsoft.SqlServer.Dac.Model;
using Microsoft.SqlServer.TransactSql.ScriptDom;
using Microsoft.SqlTools.ServiceLayer.DacFx.Contracts;
using Microsoft.SqlTools.Utility;
using ASA = ASAScriptDom::Microsoft.SqlServer.TransactSql.ScriptDom;
namespace Microsoft.SqlTools.ServiceLayer.DacFx
{
/// <summary>
/// Class to represent a validate streaming job operation
/// </summary>
class ValidateStreamingJobOperation
{
public ValidateStreamingJobParams Parameters { get; }
public ValidateStreamingJobOperation(ValidateStreamingJobParams parameters)
{
Validate.IsNotNull("parameters", parameters);
this.Parameters = parameters;
}
/// <summary>
/// Validates the transformation query/statement for a streaming job against the model contained in a dacpac
/// </summary>
/// <returns></returns>
public ValidateStreamingJobResult ValidateQuery()
{
try
{
TSqlModel model = TSqlModel.LoadFromDacpac(Parameters.PackageFilePath, new ModelLoadOptions(SqlServer.Dac.DacSchemaModelStorageType.Memory, loadAsScriptBackedModel: true));
(string name, string statement) = ExtractStreamingJobData(Parameters.CreateStreamingJobTsql); // extract the streaming job's name and statement
ASA::ParseResult referencedStreams = ParseStatement(statement); // parse the input and output streams from the statement
// Match up the referenced streams with the External Streams contained in the model
List<TSqlObject> streams = model.GetObjects(DacQueryScopes.Default, ExternalStream.TypeClass).ToList();
HashSet<string> identifiers = streams.Select(x => x.Name.Parts[^1]).ToHashSet();
List<string> errors = new List<string>();
foreach (ASA::SchemaObjectName stream in referencedStreams.Inputs.Values)
{
if (!identifiers.Contains(stream.BaseIdentifier.Value))
{
errors.Add(SR.StreamNotFoundInModel(SR.Input, stream.BaseIdentifier.Value));
}
}
foreach (ASA::SchemaObjectName stream in referencedStreams.Outputs.Values)
{
if (!identifiers.Contains(stream.BaseIdentifier.Value))
{
errors.Add(SR.StreamNotFoundInModel(SR.Output, stream.BaseIdentifier.Value));
}
}
return new ValidateStreamingJobResult()
{
Success = errors.Count == 0,
ErrorMessage = errors.Count == 0 ? null : SR.StreamingJobValidationFailed(name) + Environment.NewLine + String.Join(Environment.NewLine, errors)
};
}
catch (Exception ex)
{
return new ValidateStreamingJobResult()
{
Success = false,
ErrorMessage = ex.Message
};
}
}
/// <summary>
/// Extracts the streaming job's name and transformation statement/query from the TSQL script
/// </summary>
/// <param name="createStreamingJobTsql"></param>
/// <returns></returns>
private (string JobName, string JobStatement) ExtractStreamingJobData(string createStreamingJobTsql)
{
TSqlParser parser = new TSql150Parser(initialQuotedIdentifiers: true);
TSqlFragment fragment = parser.Parse(new StringReader(createStreamingJobTsql), out IList<ParseError> errors);
if (((TSqlScript)fragment).Batches.Count != 1)
{
throw new ArgumentException(SR.FragmentShouldHaveOnlyOneBatch);
}
TSqlBatch batch = ((TSqlScript)fragment).Batches[0];
TSqlStatement statement = batch.Statements[0];
CreateExternalStreamingJobStatement createStatement = statement as CreateExternalStreamingJobStatement;
// if the TSQL doesn't contain a CreateExternalStreamingJobStatement, we're in a bad path.
if (createStatement == null)
{
throw new ArgumentException(SR.NoCreateStreamingJobStatementFound);
}
return (createStatement.Name.Value, createStatement.Statement.Value);
}
private ASA::ParseResult ParseStatement(string query)
{
ASA::TSqlNRTParser parser = new ASA::TSqlNRTParser(initialQuotedIdentifiers: true);
ASA::ParseResult result;
try
{
ASA::TSqlFragmentExtensions.Parse(parser, new StringReader(query), out result);
}
catch (Exception arg)
{
Console.WriteLine($"Failed to parse query. [{arg}]");
throw;
}
return result;
}
}
}

View File

@@ -2949,6 +2949,14 @@ namespace Microsoft.SqlTools.ServiceLayer
} }
} }
public static string ValidateStreamingJobTaskName
{
get
{
return Keys.GetString(Keys.ValidateStreamingJobTaskName);
}
}
public static string ExtractInvalidVersion public static string ExtractInvalidVersion
{ {
get get
@@ -2957,6 +2965,38 @@ namespace Microsoft.SqlTools.ServiceLayer
} }
} }
public static string Input
{
get
{
return Keys.GetString(Keys.Input);
}
}
public static string Output
{
get
{
return Keys.GetString(Keys.Output);
}
}
public static string FragmentShouldHaveOnlyOneBatch
{
get
{
return Keys.GetString(Keys.FragmentShouldHaveOnlyOneBatch);
}
}
public static string NoCreateStreamingJobStatementFound
{
get
{
return Keys.GetString(Keys.NoCreateStreamingJobStatementFound);
}
}
public static string PublishChangesTaskName public static string PublishChangesTaskName
{ {
get get
@@ -3223,6 +3263,16 @@ namespace Microsoft.SqlTools.ServiceLayer
return Keys.GetString(Keys.ScheduleNameAlreadyExists, scheduleName); return Keys.GetString(Keys.ScheduleNameAlreadyExists, scheduleName);
} }
public static string StreamNotFoundInModel(string streamType, string missingStreamName)
{
return Keys.GetString(Keys.StreamNotFoundInModel, streamType, missingStreamName);
}
public static string StreamingJobValidationFailed(string jobName)
{
return Keys.GetString(Keys.StreamingJobValidationFailed, jobName);
}
public static string SqlAssessmentUnsuppoertedEdition(int editionCode) public static string SqlAssessmentUnsuppoertedEdition(int editionCode)
{ {
return Keys.GetString(Keys.SqlAssessmentUnsuppoertedEdition, editionCode); return Keys.GetString(Keys.SqlAssessmentUnsuppoertedEdition, editionCode);
@@ -4457,9 +4507,30 @@ namespace Microsoft.SqlTools.ServiceLayer
public const string ProjectExtractTaskName = "ProjectExtractTaskName"; public const string ProjectExtractTaskName = "ProjectExtractTaskName";
public const string ValidateStreamingJobTaskName = "ValidateStreamingJobTaskName";
public const string ExtractInvalidVersion = "ExtractInvalidVersion"; public const string ExtractInvalidVersion = "ExtractInvalidVersion";
public const string StreamNotFoundInModel = "StreamNotFoundInModel";
public const string Input = "Input";
public const string Output = "Output";
public const string StreamingJobValidationFailed = "StreamingJobValidationFailed";
public const string FragmentShouldHaveOnlyOneBatch = "FragmentShouldHaveOnlyOneBatch";
public const string NoCreateStreamingJobStatementFound = "NoCreateStreamingJobStatementFound";
public const string PublishChangesTaskName = "PublishChangesTaskName"; public const string PublishChangesTaskName = "PublishChangesTaskName";

View File

@@ -1787,10 +1787,40 @@
<value>Extract project files</value> <value>Extract project files</value>
<comment></comment> <comment></comment>
</data> </data>
<data name="ValidateStreamingJobTaskName" xml:space="preserve">
<value>Validate streaming job</value>
<comment></comment>
</data>
<data name="ExtractInvalidVersion" xml:space="preserve"> <data name="ExtractInvalidVersion" xml:space="preserve">
<value>Invalid version &apos;{0}&apos; passed. Version must be in the format x.x.x.x where x is a number.</value> <value>Invalid version &apos;{0}&apos; passed. Version must be in the format x.x.x.x where x is a number.</value>
<comment></comment> <comment></comment>
</data> </data>
<data name="StreamNotFoundInModel" xml:space="preserve">
<value>Streaming query statement contains a reference to missing {0} stream &apos;{1}&apos;. You must add it to the database model.</value>
<comment>.
Parameters: 0 - streamType (string), 1 - missingStreamName (string) </comment>
</data>
<data name="Input" xml:space="preserve">
<value>input</value>
<comment></comment>
</data>
<data name="Output" xml:space="preserve">
<value>output</value>
<comment></comment>
</data>
<data name="StreamingJobValidationFailed" xml:space="preserve">
<value>Validation for external streaming job &apos;{0}&apos; failed:</value>
<comment>.
Parameters: 0 - jobName (string) </comment>
</data>
<data name="FragmentShouldHaveOnlyOneBatch" xml:space="preserve">
<value>TSQL fragment should contain exactly one batch.</value>
<comment></comment>
</data>
<data name="NoCreateStreamingJobStatementFound" xml:space="preserve">
<value>No External Streaming Job creation TSQL found (EXEC sp_create_streaming_job statement).</value>
<comment></comment>
</data>
<data name="PublishChangesTaskName" xml:space="preserve"> <data name="PublishChangesTaskName" xml:space="preserve">
<value>Apply schema compare changes</value> <value>Apply schema compare changes</value>
<comment></comment> <comment></comment>

View File

@@ -826,7 +826,14 @@ ExtractDacpacTaskName = Extract dacpac
DeployDacpacTaskName = Deploy dacpac DeployDacpacTaskName = Deploy dacpac
GenerateScriptTaskName = Generate script GenerateScriptTaskName = Generate script
ProjectExtractTaskName = Extract project files ProjectExtractTaskName = Extract project files
ValidateStreamingJobTaskName = Validate streaming job
ExtractInvalidVersion = Invalid version '{0}' passed. Version must be in the format x.x.x.x where x is a number. ExtractInvalidVersion = Invalid version '{0}' passed. Version must be in the format x.x.x.x where x is a number.
StreamNotFoundInModel(string streamType, string missingStreamName) = Streaming query statement contains a reference to missing {0} stream '{1}'. You must add it to the database model.
Input = input
Output = output
StreamingJobValidationFailed(string jobName) = Validation for external streaming job '{0}' failed:
FragmentShouldHaveOnlyOneBatch = TSQL fragment should contain exactly one batch.
NoCreateStreamingJobStatementFound = No External Streaming Job creation TSQL found (EXEC sp_create_streaming_job statement).
############################################################################ ############################################################################
# Schema Compare # Schema Compare

View File

@@ -2124,6 +2124,43 @@
<target state="new">Extract project files</target> <target state="new">Extract project files</target>
<note></note> <note></note>
</trans-unit> </trans-unit>
<trans-unit id="ValidateStreamingJobTaskName">
<source>Validate streaming job</source>
<target state="new">Validate streaming job</target>
<note></note>
</trans-unit>
<trans-unit id="StreamNotFoundInModel">
<source>Streaming query statement contains a reference to missing {0} stream '{1}'. You must add it to the database model.</source>
<target state="new">Streaming query statement contains a reference to missing {0} stream '{1}'. You must add it to the database model.</target>
<note>.
Parameters: 0 - streamType (string), 1 - missingStreamName (string) </note>
</trans-unit>
<trans-unit id="Input">
<source>input</source>
<target state="new">input</target>
<note></note>
</trans-unit>
<trans-unit id="Output">
<source>output</source>
<target state="new">output</target>
<note></note>
</trans-unit>
<trans-unit id="StreamingJobValidationFailed">
<source>Validation for external streaming job '{0}' failed:</source>
<target state="new">Validation for external streaming job '{0}' failed:</target>
<note>.
Parameters: 0 - jobName (string) </note>
</trans-unit>
<trans-unit id="FragmentShouldHaveOnlyOneBatch">
<source>TSQL fragment should contain exactly one batch.</source>
<target state="new">TSQL fragment should contain exactly one batch.</target>
<note></note>
</trans-unit>
<trans-unit id="NoCreateStreamingJobStatementFound">
<source>No External Streaming Job creation TSQL found (EXEC sp_create_streaming_job statement).</source>
<target state="new">No External Streaming Job creation TSQL found (EXEC sp_create_streaming_job statement).</target>
<note></note>
</trans-unit>
</body> </body>
</file> </file>
</xliff> </xliff>

View File

@@ -25,6 +25,10 @@
<PackageReference Include="System.Text.Encoding.CodePages" /> <PackageReference Include="System.Text.Encoding.CodePages" />
<PackageReference Include="Microsoft.SqlServer.Assessment" /> <PackageReference Include="Microsoft.SqlServer.Assessment" />
<PackageReference Include="Microsoft.SqlServer.Migration.Assessment" /> <PackageReference Include="Microsoft.SqlServer.Migration.Assessment" />
<PackageReference Include="System.Text.Encoding.CodePages" />
<PackageReference Include="Microsoft.SqlServer.TransactSql.ScriptDom.NRT">
<Aliases>ASAScriptDom</Aliases>
</PackageReference>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="**\*.cs" Exclude="**/obj/**/*.cs" /> <Compile Include="**\*.cs" Exclude="**/obj/**/*.cs" />

View File

@@ -62,6 +62,24 @@ AS
RETURN 0 RETURN 0
"; ";
private string dacpacsFolder = Path.Combine("..", "..", "..", "DacFx", "Dacpacs");
private string goodCreateStreamingJob = @"EXEC sys.sp_create_streaming_job @NAME = 'myJob', @STATEMENT = 'INSERT INTO SqlOutputStream SELECT
timeCreated,
machine.temperature as machine_temperature,
machine.pressure as machine_pressure,
ambient.temperature as ambient_temperature,
ambient.humidity as ambient_humidity
FROM EdgeHubInputStream'";
private string missingCreateBothStreamingJob = @$"EXEC sys.sp_create_streaming_job @NAME = 'myJob', @STATEMENT = 'INSERT INTO MissingSqlOutputStream SELECT
timeCreated,
machine.temperature as machine_temperature,
machine.pressure as machine_pressure,
ambient.temperature as ambient_temperature,
ambient.humidity as ambient_humidity
FROM MissingEdgeHubInputStream'";
private LiveConnectionHelper.TestConnectionResult GetLiveAutoCompleteTestObjects() private LiveConnectionHelper.TestConnectionResult GetLiveAutoCompleteTestObjects()
{ {
var result = LiveConnectionHelper.InitLiveConnectionInfo(); var result = LiveConnectionHelper.InitLiveConnectionInfo();
@@ -754,6 +772,56 @@ RETURN 0
dacfxRequestContext.VerifyAll(); dacfxRequestContext.VerifyAll();
} }
/// <summary>
/// Verify that streaming job
/// </summary>
/// <returns></returns>
[Test]
public async Task ValidateStreamingJob()
{
var dacfxRequestContext = new Mock<RequestContext<ValidateStreamingJobResult>>();
DacFxService service = new DacFxService();
ValidateStreamingJobResult expectedResult;
// Positive case: both input and output are present
expectedResult = new ValidateStreamingJobResult() { Success = true };
dacfxRequestContext.Setup((RequestContext<ValidateStreamingJobResult> x) => x.SendResult(It.Is<ValidateStreamingJobResult>((result) => ValidateStreamingJobErrors(expectedResult, result) == true))).Returns(Task.FromResult(new object()));
ValidateStreamingJobParams parameters = new ValidateStreamingJobParams()
{
PackageFilePath = Path.Combine(dacpacsFolder, "StreamingJobTestDb.dacpac"),
CreateStreamingJobTsql = goodCreateStreamingJob
};
await service.HandleValidateStreamingJobRequest(parameters, dacfxRequestContext.Object);
dacfxRequestContext.VerifyAll();
// Negative case: input and output streams are both missing from model
const string errorMessage = @"Validation for external streaming job 'myJob' failed:
Streaming query statement contains a reference to missing input stream 'MissingEdgeHubInputStream'. You must add it to the database model.
Streaming query statement contains a reference to missing output stream 'MissingSqlOutputStream'. You must add it to the database model.";
expectedResult = new ValidateStreamingJobResult() { Success = false, ErrorMessage = errorMessage };
dacfxRequestContext.Setup((RequestContext<ValidateStreamingJobResult> x) => x.SendResult(It.Is<ValidateStreamingJobResult>((result) => ValidateStreamingJobErrors(expectedResult, result)))).Returns(Task.FromResult(new object()));
parameters = new ValidateStreamingJobParams()
{
PackageFilePath = Path.Combine(dacpacsFolder, "StreamingJobTestDb.dacpac"),
CreateStreamingJobTsql = missingCreateBothStreamingJob
};
await service.HandleValidateStreamingJobRequest(parameters, dacfxRequestContext.Object);
dacfxRequestContext.VerifyAll();
}
private bool ValidateStreamingJobErrors(ValidateStreamingJobResult expected, ValidateStreamingJobResult actual)
{
return expected.Success == actual.Success
&& expected.ErrorMessage == actual.ErrorMessage;
}
private bool ValidateOptions(DeploymentOptions expected, DeploymentOptions actual) private bool ValidateOptions(DeploymentOptions expected, DeploymentOptions actual)
{ {
System.Reflection.PropertyInfo[] deploymentOptionsProperties = expected.GetType().GetProperties(); System.Reflection.PropertyInfo[] deploymentOptionsProperties = expected.GetType().GetProperties();

View File

@@ -17,9 +17,9 @@
<ProjectReference Include="../Microsoft.SqlTools.Test.CompletionExtension/Microsoft.SqlTools.Test.CompletionExtension.csproj" /> <ProjectReference Include="../Microsoft.SqlTools.Test.CompletionExtension/Microsoft.SqlTools.Test.CompletionExtension.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Moq" /> <PackageReference Include="Moq" />
<PackageReference Include="System.Net.Http"/> <PackageReference Include="System.Net.Http" />
<PackageReference Include="Microsoft.NET.Test.Sdk" /> <PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="nunit" /> <PackageReference Include="nunit" />
<PackageReference Include="nunit3testadapter" /> <PackageReference Include="nunit3testadapter" />
<PackageReference Include="nunit.console" /> <PackageReference Include="nunit.console" />
@@ -33,9 +33,12 @@
<ItemGroup> <ItemGroup>
<Content Remove=".\Agent\NotebookResources\TestNotebook.ipynb" /> <Content Remove=".\Agent\NotebookResources\TestNotebook.ipynb" />
<EmbeddedResource Include=".\Agent\NotebookResources\TestNotebook.ipynb" /> <EmbeddedResource Include=".\Agent\NotebookResources\TestNotebook.ipynb" />
<Content Include="..\..\src\Microsoft.SqlTools.ServiceLayer\Migration\Metadata\**"> <Content Include="..\..\src\Microsoft.SqlTools.ServiceLayer\Migration\Metadata\**">
<Link>%(RecursiveDir)%(Filename)%(Extension)</Link> <Link>%(RecursiveDir)%(Filename)%(Extension)</Link>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content> </Content>
</ItemGroup> </ItemGroup>
<ItemGroup>
<Folder Include="DacFx\Dacpacs\" />
</ItemGroup>
</Project> </Project>