diff --git a/Packages.props b/Packages.props
index fe95c60a..4f8fd684 100644
--- a/Packages.props
+++ b/Packages.props
@@ -32,5 +32,6 @@
+
\ No newline at end of file
diff --git a/bin/nuget/Microsoft.SqlServer.TransactSql.ScriptDom.NRT.1.2.65626.134.nupkg b/bin/nuget/Microsoft.SqlServer.TransactSql.ScriptDom.NRT.1.2.65626.134.nupkg
new file mode 100644
index 00000000..9e0c86e7
Binary files /dev/null and b/bin/nuget/Microsoft.SqlServer.TransactSql.ScriptDom.NRT.1.2.65626.134.nupkg differ
diff --git a/global.json b/global.json
index 6cf68d50..076dce50 100644
--- a/global.json
+++ b/global.json
@@ -1,5 +1,5 @@
{
"sdk": {
- "version": "3.1.302"
+ "version": "3.1.403"
}
}
\ No newline at end of file
diff --git a/src/Microsoft.SqlTools.ServiceLayer/DacFx/Contracts/ValidateStreamingJobParams.cs b/src/Microsoft.SqlTools.ServiceLayer/DacFx/Contracts/ValidateStreamingJobParams.cs
new file mode 100644
index 00000000..b35ae57b
--- /dev/null
+++ b/src/Microsoft.SqlTools.ServiceLayer/DacFx/Contracts/ValidateStreamingJobParams.cs
@@ -0,0 +1,44 @@
+//
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+//
+using System.Collections.Generic;
+using Microsoft.SqlTools.Hosting.Protocol.Contracts;
+using Microsoft.SqlTools.ServiceLayer.SchemaCompare.Contracts;
+using Microsoft.SqlTools.ServiceLayer.Utility;
+
+namespace Microsoft.SqlTools.ServiceLayer.DacFx.Contracts
+{
+ ///
+ /// Parameters for a Validate Streaming Job request.
+ ///
+ public class ValidateStreamingJobParams
+ {
+ ///
+ /// Gets or sets the package file path
+ ///
+ public string PackageFilePath { get; set; }
+
+ ///
+ /// Gets or sets the create streaming job TSQL. Should not be used if Statement is set.
+ ///
+ public string CreateStreamingJobTsql { get; set;}
+ }
+
+ ///
+ /// Parameters returned from a DacFx validate streaming job request.
+ ///
+ public class ValidateStreamingJobResult : ResultStatus
+ {
+
+ }
+
+ ///
+ /// Defines the DacFx validate streaming job request type
+ ///
+ class ValidateStreamingJobRequest
+ {
+ public static readonly RequestType Type =
+ RequestType.Create("dacfx/validateStreamingJob");
+ }
+}
diff --git a/src/Microsoft.SqlTools.ServiceLayer/DacFx/DacFxService.cs b/src/Microsoft.SqlTools.ServiceLayer/DacFx/DacFxService.cs
index a4717ba3..f24ef689 100644
--- a/src/Microsoft.SqlTools.ServiceLayer/DacFx/DacFxService.cs
+++ b/src/Microsoft.SqlTools.ServiceLayer/DacFx/DacFxService.cs
@@ -47,6 +47,7 @@ namespace Microsoft.SqlTools.ServiceLayer.DacFx
serviceHost.SetRequestHandler(GenerateDeployScriptRequest.Type, this.HandleGenerateDeployScriptRequest);
serviceHost.SetRequestHandler(GenerateDeployPlanRequest.Type, this.HandleGenerateDeployPlanRequest);
serviceHost.SetRequestHandler(GetOptionsFromProfileRequest.Type, this.HandleGetOptionsFromProfileRequest);
+ serviceHost.SetRequestHandler(ValidateStreamingJobRequest.Type, this.HandleValidateStreamingJobRequest);
}
///
@@ -256,6 +257,25 @@ namespace Microsoft.SqlTools.ServiceLayer.DacFx
}
}
+ ///
+ /// Handles request to validate an ASA streaming job
+ ///
+ ///
+ public async Task HandleValidateStreamingJobRequest(ValidateStreamingJobParams parameters, RequestContext requestContext)
+ {
+ try
+ {
+ ValidateStreamingJobOperation operation = new ValidateStreamingJobOperation(parameters);
+ ValidateStreamingJobResult result = operation.ValidateQuery();
+
+ await requestContext.SendResult(result);
+ }
+ catch (Exception e)
+ {
+ await requestContext.SendError(e);
+ }
+ }
+
private void ExecuteOperation(DacFxOperation operation, DacFxParams parameters, string taskName, RequestContext requestContext)
{
Task.Run(async () =>
diff --git a/src/Microsoft.SqlTools.ServiceLayer/DacFx/ValidateStreamingJobOperation.cs b/src/Microsoft.SqlTools.ServiceLayer/DacFx/ValidateStreamingJobOperation.cs
new file mode 100644
index 00000000..3b95792d
--- /dev/null
+++ b/src/Microsoft.SqlTools.ServiceLayer/DacFx/ValidateStreamingJobOperation.cs
@@ -0,0 +1,137 @@
+//
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+//
+
+extern alias ASAScriptDom;
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using Microsoft.SqlServer.Dac.Model;
+using Microsoft.SqlServer.TransactSql.ScriptDom;
+using Microsoft.SqlTools.ServiceLayer.DacFx.Contracts;
+using Microsoft.SqlTools.Utility;
+
+using ASA = ASAScriptDom::Microsoft.SqlServer.TransactSql.ScriptDom;
+
+namespace Microsoft.SqlTools.ServiceLayer.DacFx
+{
+ ///
+ /// Class to represent a validate streaming job operation
+ ///
+ class ValidateStreamingJobOperation
+ {
+ public ValidateStreamingJobParams Parameters { get; }
+
+ public ValidateStreamingJobOperation(ValidateStreamingJobParams parameters)
+ {
+ Validate.IsNotNull("parameters", parameters);
+ this.Parameters = parameters;
+ }
+
+ ///
+ /// Validates the transformation query/statement for a streaming job against the model contained in a dacpac
+ ///
+ ///
+ public ValidateStreamingJobResult ValidateQuery()
+ {
+ try
+ {
+ TSqlModel model = TSqlModel.LoadFromDacpac(Parameters.PackageFilePath, new ModelLoadOptions(SqlServer.Dac.DacSchemaModelStorageType.Memory, loadAsScriptBackedModel: true));
+
+ (string name, string statement) = ExtractStreamingJobData(Parameters.CreateStreamingJobTsql); // extract the streaming job's name and statement
+ ASA::ParseResult referencedStreams = ParseStatement(statement); // parse the input and output streams from the statement
+
+ // Match up the referenced streams with the External Streams contained in the model
+
+ List streams = model.GetObjects(DacQueryScopes.Default, ExternalStream.TypeClass).ToList();
+ HashSet identifiers = streams.Select(x => x.Name.Parts[^1]).ToHashSet();
+
+ List errors = new List();
+
+ foreach (ASA::SchemaObjectName stream in referencedStreams.Inputs.Values)
+ {
+ if (!identifiers.Contains(stream.BaseIdentifier.Value))
+ {
+ errors.Add(SR.StreamNotFoundInModel(SR.Input, stream.BaseIdentifier.Value));
+ }
+ }
+
+ foreach (ASA::SchemaObjectName stream in referencedStreams.Outputs.Values)
+ {
+ if (!identifiers.Contains(stream.BaseIdentifier.Value))
+ {
+ errors.Add(SR.StreamNotFoundInModel(SR.Output, stream.BaseIdentifier.Value));
+ }
+ }
+
+ return new ValidateStreamingJobResult()
+ {
+ Success = errors.Count == 0,
+ ErrorMessage = errors.Count == 0 ? null : SR.StreamingJobValidationFailed(name) + Environment.NewLine + String.Join(Environment.NewLine, errors)
+ };
+ }
+ catch (Exception ex)
+ {
+ return new ValidateStreamingJobResult()
+ {
+ Success = false,
+ ErrorMessage = ex.Message
+ };
+ }
+ }
+
+ ///
+ /// Extracts the streaming job's name and transformation statement/query from the TSQL script
+ ///
+ ///
+ ///
+ private (string JobName, string JobStatement) ExtractStreamingJobData(string createStreamingJobTsql)
+ {
+ TSqlParser parser = new TSql150Parser(initialQuotedIdentifiers: true);
+
+ TSqlFragment fragment = parser.Parse(new StringReader(createStreamingJobTsql), out IList errors);
+
+ if (((TSqlScript)fragment).Batches.Count != 1)
+ {
+ throw new ArgumentException(SR.FragmentShouldHaveOnlyOneBatch);
+ }
+
+ TSqlBatch batch = ((TSqlScript)fragment).Batches[0];
+ TSqlStatement statement = batch.Statements[0];
+
+ CreateExternalStreamingJobStatement createStatement = statement as CreateExternalStreamingJobStatement;
+
+ // if the TSQL doesn't contain a CreateExternalStreamingJobStatement, we're in a bad path.
+
+ if (createStatement == null)
+ {
+ throw new ArgumentException(SR.NoCreateStreamingJobStatementFound);
+ }
+
+ return (createStatement.Name.Value, createStatement.Statement.Value);
+ }
+
+ private ASA::ParseResult ParseStatement(string query)
+ {
+ ASA::TSqlNRTParser parser = new ASA::TSqlNRTParser(initialQuotedIdentifiers: true);
+ ASA::ParseResult result;
+
+ try
+ {
+ ASA::TSqlFragmentExtensions.Parse(parser, new StringReader(query), out result);
+ }
+ catch (Exception arg)
+ {
+ Console.WriteLine($"Failed to parse query. [{arg}]");
+ throw;
+ }
+
+ return result;
+ }
+ }
+}
+
diff --git a/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.cs b/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.cs
index 0bb8c7f2..4b7523d8 100644
--- a/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.cs
+++ b/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.cs
@@ -2949,6 +2949,14 @@ namespace Microsoft.SqlTools.ServiceLayer
}
}
+ public static string ValidateStreamingJobTaskName
+ {
+ get
+ {
+ return Keys.GetString(Keys.ValidateStreamingJobTaskName);
+ }
+ }
+
public static string ExtractInvalidVersion
{
get
@@ -2957,6 +2965,38 @@ namespace Microsoft.SqlTools.ServiceLayer
}
}
+ public static string Input
+ {
+ get
+ {
+ return Keys.GetString(Keys.Input);
+ }
+ }
+
+ public static string Output
+ {
+ get
+ {
+ return Keys.GetString(Keys.Output);
+ }
+ }
+
+ public static string FragmentShouldHaveOnlyOneBatch
+ {
+ get
+ {
+ return Keys.GetString(Keys.FragmentShouldHaveOnlyOneBatch);
+ }
+ }
+
+ public static string NoCreateStreamingJobStatementFound
+ {
+ get
+ {
+ return Keys.GetString(Keys.NoCreateStreamingJobStatementFound);
+ }
+ }
+
public static string PublishChangesTaskName
{
get
@@ -3223,6 +3263,16 @@ namespace Microsoft.SqlTools.ServiceLayer
return Keys.GetString(Keys.ScheduleNameAlreadyExists, scheduleName);
}
+ public static string StreamNotFoundInModel(string streamType, string missingStreamName)
+ {
+ return Keys.GetString(Keys.StreamNotFoundInModel, streamType, missingStreamName);
+ }
+
+ public static string StreamingJobValidationFailed(string jobName)
+ {
+ return Keys.GetString(Keys.StreamingJobValidationFailed, jobName);
+ }
+
public static string SqlAssessmentUnsuppoertedEdition(int editionCode)
{
return Keys.GetString(Keys.SqlAssessmentUnsuppoertedEdition, editionCode);
@@ -4457,9 +4507,30 @@ namespace Microsoft.SqlTools.ServiceLayer
public const string ProjectExtractTaskName = "ProjectExtractTaskName";
+ public const string ValidateStreamingJobTaskName = "ValidateStreamingJobTaskName";
+
+
public const string ExtractInvalidVersion = "ExtractInvalidVersion";
+ public const string StreamNotFoundInModel = "StreamNotFoundInModel";
+
+
+ public const string Input = "Input";
+
+
+ public const string Output = "Output";
+
+
+ public const string StreamingJobValidationFailed = "StreamingJobValidationFailed";
+
+
+ public const string FragmentShouldHaveOnlyOneBatch = "FragmentShouldHaveOnlyOneBatch";
+
+
+ public const string NoCreateStreamingJobStatementFound = "NoCreateStreamingJobStatementFound";
+
+
public const string PublishChangesTaskName = "PublishChangesTaskName";
diff --git a/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.resx b/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.resx
index 8714a121..06276e20 100644
--- a/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.resx
+++ b/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.resx
@@ -1787,10 +1787,40 @@
Extract project files
+
+ Validate streaming job
+
+
Invalid version '{0}' passed. Version must be in the format x.x.x.x where x is a number.
+
+ Streaming query statement contains a reference to missing {0} stream '{1}'. You must add it to the database model.
+ .
+ Parameters: 0 - streamType (string), 1 - missingStreamName (string)
+
+
+ input
+
+
+
+ output
+
+
+
+ Validation for external streaming job '{0}' failed:
+ .
+ Parameters: 0 - jobName (string)
+
+
+ TSQL fragment should contain exactly one batch.
+
+
+
+ No External Streaming Job creation TSQL found (EXEC sp_create_streaming_job statement).
+
+
Apply schema compare changes
diff --git a/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.strings b/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.strings
index 8ec8d189..91dbb106 100644
--- a/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.strings
+++ b/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.strings
@@ -826,7 +826,14 @@ ExtractDacpacTaskName = Extract dacpac
DeployDacpacTaskName = Deploy dacpac
GenerateScriptTaskName = Generate script
ProjectExtractTaskName = Extract project files
+ValidateStreamingJobTaskName = Validate streaming job
ExtractInvalidVersion = Invalid version '{0}' passed. Version must be in the format x.x.x.x where x is a number.
+StreamNotFoundInModel(string streamType, string missingStreamName) = Streaming query statement contains a reference to missing {0} stream '{1}'. You must add it to the database model.
+Input = input
+Output = output
+StreamingJobValidationFailed(string jobName) = Validation for external streaming job '{0}' failed:
+FragmentShouldHaveOnlyOneBatch = TSQL fragment should contain exactly one batch.
+NoCreateStreamingJobStatementFound = No External Streaming Job creation TSQL found (EXEC sp_create_streaming_job statement).
############################################################################
# Schema Compare
diff --git a/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.xlf b/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.xlf
index d53556f2..357afdc9 100644
--- a/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.xlf
+++ b/src/Microsoft.SqlTools.ServiceLayer/Localization/sr.xlf
@@ -2124,6 +2124,43 @@
Extract project files
+
+ Validate streaming job
+ Validate streaming job
+
+
+
+ Streaming query statement contains a reference to missing {0} stream '{1}'. You must add it to the database model.
+ Streaming query statement contains a reference to missing {0} stream '{1}'. You must add it to the database model.
+ .
+ Parameters: 0 - streamType (string), 1 - missingStreamName (string)
+
+
+ input
+ input
+
+
+
+ output
+ output
+
+
+
+ Validation for external streaming job '{0}' failed:
+ Validation for external streaming job '{0}' failed:
+ .
+ Parameters: 0 - jobName (string)
+
+
+ TSQL fragment should contain exactly one batch.
+ TSQL fragment should contain exactly one batch.
+
+
+
+ No External Streaming Job creation TSQL found (EXEC sp_create_streaming_job statement).
+ No External Streaming Job creation TSQL found (EXEC sp_create_streaming_job statement).
+
+