Update to XElite (#1287)

* added Xevent.xelite to packages

* WIP on XELite conversion

* added wip changes

* added streaminfo class for preserving streams

* added list for profilerService

* added cancelXelStreamRequest.cs

* added test configuration for profilerservice

* added request handler using startprofilingrequest

* fix start profiling result

* added small connection string

* WIP branch for XElite (not functional)

* added hardcoded string with working stream

* added check for buildconnectionstring

* added back HandleXEvent

* added profilerservice eventsavailable test

* WIP change for profilersessionmonitor

* added more changes to profilersessionmonitor

* changed HandleXEvent

* added more additions to profielrSessionMonitor

* added startmonitoringstream

* added startmonitoringsession

* added startmonitoringstream to IProfilerSessionMonitor

* switch to monitoringStream

* added assignment of connectioninfo

* added conninfo

* added conninfo to iProfilerSessionMonitor.cs

* added isStreaming flag

* added token list

* added XEventSession name.

* removed polling lock

* test adding filters

* removed old profile filter as its incompatible

* added wip cancel feature in removesession

* moved cancellationtoken outside

* added backIsStreaming

* moved isstreaming around

* added  multiple events in list

* added timeout to handleXEvent

* removed timeout

* remove eventList count check

* remove old events filter

* returned eventlist

* remove old events filter

* renamed xelite handle function

* restored sqlclient version

* removed original handlestartprofilingrequest

* added monitoring stream to handlecreatexeventsessionrequest

* removed unnecessary sections from monitor

* Revert "removed unnecessary sections from monitor"

This reverts commit 91cadeebeeedfe99cec2e9c42944ba6716d95a61.

* added xevent actions to profileEvent

* removed polling lock for processStreams

* added filter for oldevents

* removed unused methods

* removed comment

* removed unnecessary class

* removed unnecessary requests

* removed outdated methods

* added work in progress cancellation task

* added profilersessionmonitor changes

* added small changes

* renamed startMonitoringStream

* more changes related to feedback

* made changes to code

* removed more polling code

* fixed tests

* added connectioninfo to testxeventsessions

* changed functions

* added back else

* small formatting fix

* more changes made

* added changes to XEventSession

* update to strings

* added changes to accomodate tests

* more changes

* added profilerservicetest for stopprofiling

* added TestStoppedSessionNotification test

* added session missing details handler

* simplified error message

* restored strings and added changes

* added auto-getter setter for IsStreaming

* added more changes

* removed unnecessary lines from test

* added more debugging messages

* made last changes

* added error message for handlestopprofilingrequest

* added more debug messages

* added back an s

* added verbose
This commit is contained in:
Alex Ma
2021-12-07 14:11:42 -08:00
committed by GitHub
parent faaf062f0f
commit 822a6459ce
15 changed files with 245 additions and 328 deletions

View File

@@ -30,6 +30,7 @@
<PackageReference Update="Microsoft.CodeAnalysis.CSharp" Version="3.10.0" />
<PackageReference Update="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="3.10.0" />
<PackageReference Update="Microsoft.CodeAnalysis.Workspaces.Common" Version="3.10.0" />
<PackageReference Update="Microsoft.SqlServer.XEvent.XELite" Version="2021.6.21.10" />
<PackageReference Update="Moq" Version="4.8.2" />
<PackageReference Update="nunit" Version="3.12.0" />

View File

@@ -8756,6 +8756,16 @@ namespace Microsoft.SqlTools.ServiceLayer
return Keys.GetString(Keys.SessionAlreadyExists, sessionName);
}
public static string SessionMissingDetails(int id)
{
return Keys.GetString(Keys.SessionMissingDetails, id);
}
public static string StartProfilingFailed(String error)
{
return Keys.GetString(Keys.StartProfilingFailed, error);
}
public static string UnknownSizeUnit(string unit)
{
return Keys.GetString(Keys.UnknownSizeUnit, unit);
@@ -9944,6 +9954,12 @@ namespace Microsoft.SqlTools.ServiceLayer
public const string SessionAlreadyExists = "SessionAlreadyExists";
public const string SessionMissingDetails = "SessionMissingDetails";
public const string StartProfilingFailed = "StartProfilingFailed";
public const string CategoryLocal = "CategoryLocal";

View File

@@ -1571,6 +1571,16 @@
<value>An XEvent session named {0} already exists</value>
<comment>.
Parameters: 0 - sessionName (String) </comment>
</data>
<data name="SessionMissingDetails" xml:space="preserve">
<value>Unable to start streaming session {0} due to missing session details.</value>
<comment>.
Parameters: 0 - id (int) </comment>
</data>
<data name="StartProfilingFailed" xml:space="preserve">
<value>Failed to start profiler: {0}</value>
<comment>.
Parameters: 0 - error (String) </comment>
</data>
<data name="CategoryLocal" xml:space="preserve">
<value>[Uncategorized (Local)]</value>

View File

@@ -759,7 +759,8 @@ PauseSessionFailed(String error) = Failed to pause session: {0}
StopSessionFailed(String error) = Failed to stop session: {0}
SessionNotFound = Cannot find requested XEvent session
SessionAlreadyExists(String sessionName) = An XEvent session named {0} already exists
SessionMissingDetails(int id) = Unable to start streaming session {0} due to missing session details.
StartProfilingFailed(String error) = Failed to start profiler: {0}
;job categories
CategoryLocal = [Uncategorized (Local)]

View File

@@ -5696,6 +5696,18 @@
<target state="new">Specifies whether the check constraint is Enabled</target>
<note></note>
</trans-unit>
<trans-unit id="SessionMissingDetails">
<source>Unable to start streaming session {0} due to missing session details.</source>
<target state="new">Unable to start streaming session {0} due to missing session details.</target>
<note>.
Parameters: 0 - id (int) </note>
</trans-unit>
<trans-unit id="StartProfilingFailed">
<source>Failed to start profiler: {0}</source>
<target state="new">Failed to start profiler: {0}</target>
<note>.
Parameters: 0 - error (String) </note>
</trans-unit>
</body>
</file>
</xliff>

View File

@@ -26,6 +26,7 @@
<PackageReference Include="Microsoft.CodeAnalysis.Workspaces.Common" />
<PackageReference Include="Microsoft.SqlServer.DACFx" />
<PackageReference Include="Microsoft.Data.SqlClient.AlwaysEncrypted.AzureKeyVaultProvider" />
<PackageReference Include="Microsoft.SqlServer.XEvent.XELite" />
<PackageReference Include="System.Text.Encoding.CodePages" />
<PackageReference Include="Microsoft.SqlServer.Assessment" />
<PackageReference Include="Microsoft.SqlServer.Migration.Assessment" />

View File

@@ -22,7 +22,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
/// <summary>
/// Starts monitoring a profiler session
/// </summary>
bool StartMonitoringSession(string viewerId, IXEventSession session);
void StartMonitoringSession(string viewerId, IXEventSession session);
/// <summary>
/// Stops monitoring a profiler session

View File

@@ -2,6 +2,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using Microsoft.SqlTools.ServiceLayer.Connection.Contracts;
using Microsoft.SqlServer.Management.XEvent;
namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
@@ -15,6 +17,16 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
/// </summary>
int Id { get; }
/// <summary>
/// Connection details associated with the session id.
/// </summary>
ConnectionDetails ConnectionDetails { get; set; }
/// <summary>
/// Session associated with the session id.
/// </summary>
Session Session { get; set; }
/// <summary>
/// Starts XEvent session
/// </summary>

View File

@@ -18,6 +18,7 @@ using Microsoft.SqlServer.Management.Sdk.Sfc;
using Microsoft.SqlServer.Management.Smo;
using Microsoft.SqlServer.Management.XEvent;
using Microsoft.SqlServer.Management.XEventDbScoped;
using Microsoft.SqlServer.XEvent.XELite;
using Microsoft.SqlTools.Hosting.Protocol;
using Microsoft.SqlTools.Hosting.Protocol.Contracts;
using Microsoft.SqlTools.ServiceLayer.Connection;
@@ -120,6 +121,42 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
this.SessionMonitor.AddSessionListener(this);
}
/// <summary>
/// Handle request to start a profiling session
/// </summary>
internal async Task HandleStartProfilingRequest(StartProfilingParams parameters, RequestContext<StartProfilingResult> requestContext)
{
await Task.Run(async () =>
{
try
{
Logger.Write(TraceEventType.Verbose, "HandleStartProfilingRequest started");
ConnectionInfo connInfo;
ConnectionServiceInstance.TryFindConnection(
parameters.OwnerUri,
out connInfo);
if (connInfo != null)
{
// create a new XEvent session and Profiler session
var xeSession = this.XEventSessionFactory.GetXEventSession(parameters.SessionName, connInfo);
monitor.StartMonitoringSession(parameters.OwnerUri, xeSession);
var result = new StartProfilingResult();
await requestContext.SendResult(result);
}
else
{
Logger.Write(TraceEventType.Error, "Connection Info could not be found for " + parameters.OwnerUri);
throw new Exception(SR.ProfilerConnectionNotFound);
}
}
catch (Exception e)
{
Logger.Write(TraceEventType.Error, "HandleStartProfilingRequest failed for uri " + parameters.OwnerUri);
await requestContext.SendError(new Exception (SR.StartProfilingFailed(e.Message), e));
}
});
}
/// <summary>
/// Handle request to start a profiling session
/// </summary>
@@ -129,20 +166,24 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
try
{
Logger.Write(TraceEventType.Verbose, "HandleCreateXEventSessionRequest started");
ConnectionInfo connInfo;
ConnectionServiceInstance.TryFindConnection(
parameters.OwnerUri,
out connInfo);
if (connInfo == null)
{
Logger.Write(TraceEventType.Error, "Connection Info could not be found for " + parameters.OwnerUri);
throw new Exception(SR.ProfilerConnectionNotFound);
}
else if (parameters.SessionName == null)
{
Logger.Write(TraceEventType.Error, "Session Name could not be found for " + parameters.OwnerUri);
throw new ArgumentNullException("SessionName");
}
else if (parameters.Template == null)
{
Logger.Write(TraceEventType.Error, "Template could not be found for " + parameters.OwnerUri);
throw new ArgumentNullException("Template");
}
else
@@ -156,10 +197,13 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
xeSession = this.XEventSessionFactory.GetXEventSession(parameters.SessionName, connInfo);
}
catch { }
catch {
Logger.Write(TraceEventType.Verbose, "Session with name '" + parameters.SessionName + "' was not found");
}
if (xeSession == null)
{
Logger.Write(TraceEventType.Verbose, "Creating new XEventSession with SessionName " + parameters.SessionName);
// create a new XEvent session and Profiler session
xeSession = this.XEventSessionFactory.CreateXEventSession(parameters.Template.CreateStatement, parameters.SessionName, connInfo);
}
@@ -175,42 +219,8 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
}
catch (Exception e)
{
await requestContext.SendError(new Exception(SR.CreateSessionFailed(e.Message)));
}
});
}
/// <summary>
/// Handle request to start a profiling session
/// </summary>
internal async Task HandleStartProfilingRequest(StartProfilingParams parameters, RequestContext<StartProfilingResult> requestContext)
{
await Task.Run(async () =>
{
try
{
ConnectionInfo connInfo;
ConnectionServiceInstance.TryFindConnection(
parameters.OwnerUri,
out connInfo);
if (connInfo != null)
{
// create a new XEvent session and Profiler session
var xeSession = this.XEventSessionFactory.GetXEventSession(parameters.SessionName, connInfo);
// start monitoring the profiler session
monitor.StartMonitoringSession(parameters.OwnerUri, xeSession);
var result = new StartProfilingResult();
await requestContext.SendResult(result);
}
else
{
throw new Exception(SR.ProfilerConnectionNotFound);
}
}
catch (Exception e)
{
await requestContext.SendError(new Exception(SR.StartSessionFailed(e.Message)));
Logger.Write(TraceEventType.Error, "HandleCreateXEventSessionRequest failed for uri " + parameters.OwnerUri);
await requestContext.SendError(new Exception (SR.CreateSessionFailed(e.Message), e));
}
});
}
@@ -224,6 +234,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
try
{
Logger.Write(TraceEventType.Verbose, "HandleStopProfilingRequest started");
ProfilerSession session;
monitor.StopMonitoringSession(parameters.OwnerUri, out session);
@@ -240,11 +251,12 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
await requestContext.SendResult(new StopProfilingResult { });
break;
}
catch (InvalidOperationException)
catch (InvalidOperationException e)
{
remainingAttempts--;
if (remainingAttempts == 0)
{
Logger.Write(TraceEventType.Error, "Stop profiler session '" + session.XEventSession.Session.Name + "' failed after three retries, last exception was: " + e.Message);
throw;
}
Thread.Sleep(500);
@@ -258,6 +270,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
}
catch (Exception e)
{
Logger.Write(TraceEventType.Error, "HandleStopProfilingRequest failed for uri " + parameters.OwnerUri);
await requestContext.SendError(new Exception(SR.StopSessionFailed(e.Message)));
}
});
@@ -272,12 +285,14 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
try
{
Logger.Write(TraceEventType.Verbose, "HandlePauseProfilingRequest started");
monitor.PauseViewer(parameters.OwnerUri);
await requestContext.SendResult(new PauseProfilingResult { });
}
catch (Exception e)
{
Logger.Write(TraceEventType.Error, "HandlePauseProfilingRequest failed for uri " + parameters.OwnerUri);
await requestContext.SendError(new Exception(SR.PauseSessionFailed(e.Message)));
}
});
@@ -292,6 +307,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
try
{
Logger.Write(TraceEventType.Verbose, "HandleGetXEventSessionsRequest started");
var result = new GetXEventSessionsResult();
ConnectionInfo connInfo;
ConnectionServiceInstance.TryFindConnection(
@@ -299,6 +315,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
out connInfo);
if (connInfo == null)
{
Logger.Write(TraceEventType.Error, "Connection Info could not be found for " + parameters.OwnerUri);
await requestContext.SendError(new Exception(SR.ProfilerConnectionNotFound));
}
else
@@ -310,6 +327,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
}
catch (Exception e)
{
Logger.Write(TraceEventType.Error, "HandleGetXEventSessionsRequest failed for uri " + parameters.OwnerUri);
await requestContext.SendError(e);
}
});
@@ -324,11 +342,13 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
try
{
Logger.Write(TraceEventType.Verbose, "HandleDisconnectSessionRequest started");
ProfilerSession session;
monitor.StopMonitoringSession(parameters.OwnerUri, out session);
}
catch (Exception e)
{
Logger.Write(TraceEventType.Error, "HandleDisconnectSessionRequest failed for uri " + parameters.OwnerUri);
await requestContext.SendError(e);
}
});
@@ -374,6 +394,15 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
return store;
}
/// <summary>
/// Nulls out properties in ConnectionDetails that aren't compatible with XElite.
/// </summary>
private static void RemoveIncompatibleConnectionProperties(ConnectionDetails connDetails){
connDetails.ConnectRetryCount = null;
connDetails.ConnectRetryInterval = null;
connDetails.MultiSubnetFailover = null;
}
/// <summary>
/// Gets an XEvent session with the given name per the IXEventSessionFactory contract
/// Also starts the session if it isn't currently running
@@ -383,6 +412,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
var sqlConnection = ConnectionService.OpenSqlConnection(connInfo);
SqlStoreConnection connection = new SqlStoreConnection(sqlConnection);
BaseXEStore store = CreateXEventStore(connInfo, connection);
RemoveIncompatibleConnectionProperties(connInfo.ConnectionDetails);
Session session = store.Sessions[sessionName];
// start the session if it isn't already running
@@ -399,6 +429,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
// create xevent session wrapper
return new XEventSession()
{
ConnectionDetails = connInfo.ConnectionDetails,
Session = session
};
}

View File

@@ -17,18 +17,13 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
/// </summary>
public class ProfilerSession
{
private static readonly TimeSpan DefaultPollingDelay = TimeSpan.FromSeconds(1);
private object pollingLock = new object();
private bool isPolling = false;
private DateTime lastPollTime = DateTime.Now.Subtract(DefaultPollingDelay);
private TimeSpan pollingDelay = DefaultPollingDelay;
public bool IsStreaming { get; set; }
private ProfilerEvent lastSeenEvent = null;
private bool eventsLost = false;
int lastSeenId = -1;
public bool pollImmediatly = false;
/// <summary>
/// Connection to use for the session
/// </summary>
@@ -39,57 +34,6 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
/// </summary>
public IXEventSession XEventSession { get; set; }
/// <summary>
/// Try to set the session into polling mode if criteria is meet
/// </summary>
/// <returns>True if session set to polling mode, False otherwise</returns>
public bool TryEnterPolling()
{
lock (this.pollingLock)
{
if (pollImmediatly || (!this.isPolling && DateTime.Now.Subtract(this.lastPollTime) >= pollingDelay))
{
this.isPolling = true;
this.lastPollTime = DateTime.Now;
this.pollImmediatly = false;
return true;
}
else
{
return false;
}
}
}
/// <summary>
/// Is the session currently being polled
/// </summary>
public bool IsPolling
{
get
{
return this.isPolling;
}
set
{
lock (this.pollingLock)
{
this.isPolling = value;
}
}
}
/// <summary>
/// The delay between session polls
/// </summary>
public TimeSpan PollingDelay
{
get
{
return pollingDelay;
}
}
/// <summary>
/// Could events have been lost in the last poll
/// </summary>
@@ -106,7 +50,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
/// </summary>
private bool IsProfilerEvent(ProfilerEvent currentEvent)
{
if (string.IsNullOrWhiteSpace(currentEvent.Name) || currentEvent.Values == null)
if (string.IsNullOrWhiteSpace(currentEvent.Name) || currentEvent.Values == null)
{
return false;
}
@@ -145,7 +89,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
public void FilterOldEvents(List<ProfilerEvent> events)
{
this.eventsLost = false;
if (lastSeenId != -1)
{
// find the last event we've previously seen
@@ -169,7 +113,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
events.RemoveRange(0, idx + 1);
}
else if(earliestSeenEventId > (lastSeenId + 1))
else if (earliestSeenEventId > (lastSeenId + 1))
{
// if there's a gap between the expected next event sequence
// and the furthest back event seen, we know we've lost events

View File

@@ -4,19 +4,13 @@
//
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using System.Diagnostics;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using Microsoft.SqlServer.Management.Sdk.Sfc;
using Microsoft.SqlServer.Management.XEvent;
using Microsoft.SqlTools.ServiceLayer.Connection.Contracts;
using Microsoft.SqlServer.XEvent.XELite;
using Microsoft.SqlTools.ServiceLayer.Connection;
using Microsoft.SqlTools.ServiceLayer.Profiler.Contracts;
using Microsoft.SqlTools.Utility;
namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
@@ -25,14 +19,10 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
/// </summary>
public class ProfilerSessionMonitor : IProfilerSessionMonitor
{
private const int PollingLoopDelay = 1000;
private object sessionsLock = new object();
private object listenersLock = new object();
private object pollingLock = new object();
private Task processorThread = null;
private struct Viewer
@@ -56,6 +46,9 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
// XEvent Session Id's matched to their Profiler Sessions
private Dictionary<int, ProfilerSession> monitoredSessions = new Dictionary<int, ProfilerSession>();
// XEvent Session Id's matched to their stream cancellation tokens
private Dictionary<int, CancellationTokenSource> monitoredCancellationTokenSources = new Dictionary<int, CancellationTokenSource>();
// ViewerId -> Viewer objects
private Dictionary<string, Viewer> allViewers = new Dictionary<string, Viewer>();
@@ -72,10 +65,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
}
}
/// <summary>
/// Start monitoring the provided session
/// </summary>
public bool StartMonitoringSession(string viewerId, IXEventSession session)
public void StartMonitoringSession(string viewerId, IXEventSession session)
{
lock (this.sessionsLock)
{
@@ -115,12 +105,10 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
}
else
{
viewers = new List<string>{ viewerId };
viewers = new List<string> { viewerId };
sessionViewers.Add(session.Id, viewers);
}
}
return true;
}
/// <summary>
@@ -160,8 +148,16 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
lock (this.sessionsLock)
{
//cancel running XEventStream for session.
CancellationTokenSource targetToken;
if (monitoredCancellationTokenSources.Remove(sessionId, out targetToken))
{
targetToken.Cancel();
}
if (this.monitoredSessions.Remove(sessionId, out session))
{
session.IsStreaming = false;
//remove all viewers for this session
List<string> viewerIds;
if (sessionViewers.Remove(sessionId, out viewerIds))
@@ -186,113 +182,105 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
}
}
public void PollSession(int sessionId)
{
lock (this.sessionsLock)
{
this.monitoredSessions[sessionId].pollImmediatly = true;
}
lock (this.pollingLock)
{
Monitor.Pulse(pollingLock);
}
}
/// <summary>
/// The core queue processing method
/// The core queue processing method, cycles through monitored sessions and creates a stream for them if not already.
/// </summary>
/// <param name="state"></param>
private void ProcessSessions()
{
while (true)
{
lock (this.pollingLock)
lock (this.sessionsLock)
{
lock (this.sessionsLock)
foreach (var id in this.monitoredSessions.Keys)
{
foreach (var session in this.monitoredSessions.Values)
ProfilerSession session;
this.monitoredSessions.TryGetValue(id, out session);
if (!session.IsStreaming)
{
List<string> viewers = this.sessionViewers[session.XEventSession.Id];
if (viewers.Any(v => allViewers[v].active))
{
ProcessSession(session);
if (viewers.Any(v => allViewers[v].active)){
StartStream(id, session);
}
}
}
Monitor.Wait(this.pollingLock, PollingLoopDelay);
}
}
}
/// <summary>
/// Process a session for new XEvents if it meets the polling criteria
/// Helper function used to process the XEvent feed from a session's stream.
/// </summary>
private void ProcessSession(ProfilerSession session)
private async Task HandleXEvent(IXEvent xEvent, ProfilerSession session)
{
if (session.TryEnterPolling())
ProfilerEvent profileEvent = new ProfilerEvent(xEvent.Name, xEvent.Timestamp.ToString());
foreach (var kvp in xEvent.Fields)
{
Task.Factory.StartNew(() =>
profileEvent.Values.Add(kvp.Key, kvp.Value.ToString());
}
foreach (var kvp in xEvent.Actions)
{
profileEvent.Values.Add(kvp.Key, kvp.Value.ToString());
}
var eventList = new List<ProfilerEvent>();
eventList.Add(profileEvent);
var eventsLost = session.EventsLost;
if (eventList.Count > 0 || eventsLost)
{
session.FilterOldEvents(eventList);
eventList = session.FilterProfilerEvents(eventList);
// notify all viewers of the event.
List<string> viewerIds = this.sessionViewers[session.XEventSession.Id];
foreach (string viewerId in viewerIds)
{
var events = PollSession(session);
bool eventsLost = session.EventsLost;
if (events.Count > 0 || eventsLost)
if (allViewers[viewerId].active)
{
// notify all viewers for the polled session
List<string> viewerIds = this.sessionViewers[session.XEventSession.Id];
foreach (string viewerId in viewerIds)
{
if (allViewers[viewerId].active)
{
SendEventsToListeners(viewerId, events, eventsLost);
}
}
SendEventsToListeners(viewerId, eventList, eventsLost);
}
});
}
}
}
private List<ProfilerEvent> PollSession(ProfilerSession session)
/// <summary>
/// Function that creates a brand new stream from a session, this is called from ProcessSessions when a session doesn't have a stream running currently.
/// </summary>
private void StartStream(int id, ProfilerSession session)
{
var events = new List<ProfilerEvent>();
try
{
if (session == null || session.XEventSession == null)
if(session.XEventSession != null && session.XEventSession.Session != null && session.XEventSession.ConnectionDetails != null){
CancellationTokenSource threadCancellationToken = new CancellationTokenSource();
var connectionString = ConnectionService.BuildConnectionString(session.XEventSession.ConnectionDetails);
var eventStreamer = new XELiveEventStreamer(connectionString, session.XEventSession.Session.Name);
// Start streaming task here, will run until cancellation or error with the feed.
var task = eventStreamer.ReadEventStream(xEvent => HandleXEvent(xEvent, session), threadCancellationToken.Token);
task.ContinueWith(t =>
{
return events;
}
var targetXml = session.XEventSession.GetTargetXml();
XmlDocument xmlDoc = new XmlDocument();
xmlDoc.LoadXml(targetXml);
var nodes = xmlDoc.DocumentElement.GetElementsByTagName("event");
foreach (XmlNode node in nodes)
{
var profilerEvent = ParseProfilerEvent(node);
if (profilerEvent != null)
//If cancellation token is missing, that means stream was stopped by the client, do not notify in this case.
CancellationTokenSource targetToken;
if (monitoredCancellationTokenSources.TryGetValue(id, out targetToken))
{
events.Add(profilerEvent);
StopSession(session.XEventSession.Id);
}
}
}
catch (XEventException)
{
SendStoppedSessionInfoToListeners(session.XEventSession.Id);
ProfilerSession tempSession;
RemoveSession(session.XEventSession.Id, out tempSession);
}
catch (Exception ex)
{
Logger.Write(TraceEventType.Warning, "Failed to poll session. error: " + ex.Message);
}
finally
{
session.IsPolling = false;
}
}, TaskContinuationOptions.OnlyOnFaulted);
session.FilterOldEvents(events);
return session.FilterProfilerEvents(events);
this.monitoredCancellationTokenSources.Add(id, threadCancellationToken);
session.IsStreaming = true;
}
else {
ProfilerSession tempSession;
RemoveSession(id, out tempSession);
throw new Exception(SR.SessionMissingDetails(id));
}
}
/// <summary>
/// Helper function for notifying listeners and stopping session in case the session is stopped on the server. This is public for tests.
/// </summary>
public void StopSession(int Id){
SendStoppedSessionInfoToListeners(Id);
ProfilerSession tempSession;
RemoveSession(Id, out tempSession);
}
/// <summary>
@@ -304,7 +292,7 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
foreach (var listener in this.listeners)
{
foreach(string viewerId in sessionViewers[sessionId])
foreach (string viewerId in sessionViewers[sessionId])
{
listener.SessionStopped(viewerId, sessionId);
}
@@ -325,31 +313,5 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
}
}
}
/// <summary>
/// Parse a single event node from XEvent XML
/// </summary>
private ProfilerEvent ParseProfilerEvent(XmlNode node)
{
var name = node.Attributes["name"];
var timestamp = node.Attributes["timestamp"];
var profilerEvent = new ProfilerEvent(name.InnerText, timestamp.InnerText);
foreach (XmlNode childNode in node.ChildNodes)
{
var childName = childNode.Attributes["name"];
XmlNode typeNode = childNode.SelectSingleNode("type");
var typeName = typeNode.Attributes["name"];
XmlNode valueNode = childNode.SelectSingleNode("value");
if (!profilerEvent.Values.ContainsKey(childName.InnerText))
{
profilerEvent.Values.Add(childName.InnerText, valueNode.InnerText);
}
}
return profilerEvent;
}
}
}

View File

@@ -5,8 +5,7 @@
using System.Linq;
using Microsoft.SqlServer.Management.XEvent;
using Microsoft.SqlTools.ServiceLayer.Connection;
using Microsoft.SqlTools.ServiceLayer.Profiler.Contracts;
using Microsoft.SqlTools.ServiceLayer.Connection.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
@@ -17,6 +16,8 @@ namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
public Session Session { get; set; }
public ConnectionDetails ConnectionDetails { get; set; }
public int Id
{
get { return Session.ID; }

View File

@@ -65,17 +65,12 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
// start profiling session
await profilerService.HandleStartProfilingRequest(requestParams, requestContext.Object);
profilerService.SessionMonitor.PollSession(1);
// simulate a short polling delay
Thread.Sleep(200);
profilerService.SessionMonitor.PollSession(1);
// wait for polling to finish, or for timeout
System.Timers.Timer pollingTimer = new System.Timers.Timer();
pollingTimer.Interval = 10000;
pollingTimer.Start();
bool timeout = false;
pollingTimer.Elapsed += new System.Timers.ElapsedEventHandler((s_, e_) => {timeout = true;});
pollingTimer.Elapsed += new System.Timers.ElapsedEventHandler((s_, e_) => { timeout = true; });
while (sessionId == null && !timeout)
{
Thread.Sleep(250);
@@ -125,6 +120,8 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
ConnectionInfo connectionInfo = TestObjects.GetTestConnectionInfo();
profilerService.ConnectionServiceInstance.OwnerToConnectionMap.Add(testUri, connectionInfo);
profilerService.XEventSessionFactory = new TestXEventSessionFactory();
mockSession.SetupProperty(p => p.ConnectionDetails, connectionInfo.ConnectionDetails);
mockSession.SetupProperty(p => p.Session, new Session(null, testUri));
var requestParams = new StopProfilingParams();
requestParams.OwnerUri = testUri;
@@ -149,12 +146,12 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
/// </summary>
/// <returns></returns>
[Test]
// TODO: Add more in-depth testing for pause effects on events received.
public async Task TestPauseProfilingRequest()
{
bool success = false;
string testUri = "test_session";
bool recievedEvents = false;
// capture pausing results
var requestContext = new Mock<RequestContext<PauseProfilingResult>>();
requestContext.Setup(rc => rc.SendResult(It.IsAny<PauseProfilingResult>()))
@@ -164,75 +161,34 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
return Task.FromResult(0);
});
// capture Listener event notifications
var mockListener = new Mock<IProfilerSessionListener>();
mockListener.Setup(p => p.EventsAvailable(It.IsAny<string>(), It.IsAny<List<ProfilerEvent>>(), It.IsAny<bool>())).Callback(() =>
{
recievedEvents = true;
});
// setup profiler service
var mockListener = new TestSessionListener();
var profilerService = new ProfilerService();
profilerService.SessionMonitor.AddSessionListener(mockListener.Object);
profilerService.SessionMonitor.AddSessionListener(mockListener);
profilerService.ConnectionServiceInstance = TestObjects.GetTestConnectionService();
ConnectionInfo connectionInfo = TestObjects.GetTestConnectionInfo();
profilerService.ConnectionServiceInstance.OwnerToConnectionMap.Add(testUri, connectionInfo);
var testSession = new TestXEventSession1();
testSession.ConnectionDetails = connectionInfo.ConnectionDetails;
testSession.Session = new Session(null, testUri);
var requestParams = new PauseProfilingParams();
requestParams.OwnerUri = testUri;
// begin monitoring session
profilerService.SessionMonitor.StartMonitoringSession(testUri, new TestXEventSession1());
// poll the session
profilerService.SessionMonitor.PollSession(1);
Thread.Sleep(500);
profilerService.SessionMonitor.PollSession(1);
// wait for polling to finish, or for timeout
System.Timers.Timer pollingTimer = new System.Timers.Timer();
pollingTimer.Interval = 10000;
pollingTimer.Start();
bool timeout = false;
pollingTimer.Elapsed += new System.Timers.ElapsedEventHandler((s_, e_) => {timeout = true;});
while (!recievedEvents && !timeout)
{
Thread.Sleep(250);
}
pollingTimer.Stop();
// confirm that polling works
Assert.True(recievedEvents);
profilerService.SessionMonitor.StartMonitoringSession(testUri, testSession);
// pause viewer
await profilerService.HandlePauseProfilingRequest(requestParams, requestContext.Object);
Assert.True(success);
recievedEvents = false;
success = false;
profilerService.SessionMonitor.PollSession(1);
// confirm that no events were sent to paused Listener
Assert.False(recievedEvents);
// unpause viewer
await profilerService.HandlePauseProfilingRequest(requestParams, requestContext.Object);
Assert.True(success);
profilerService.SessionMonitor.PollSession(1);
// wait for polling to finish, or for timeout
timeout = false;
pollingTimer.Start();
while (!recievedEvents && !timeout)
{
Thread.Sleep(250);
}
// check that events got sent to Listener
Assert.True(recievedEvents);
requestContext.VerifyAll();
}
@@ -263,21 +219,14 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
profilerService.ConnectionServiceInstance = TestObjects.GetTestConnectionService();
ConnectionInfo connectionInfo = TestObjects.GetTestConnectionInfo();
profilerService.ConnectionServiceInstance.OwnerToConnectionMap.Add(testUri, connectionInfo);
mockSession.SetupProperty(p => p.ConnectionDetails, connectionInfo.ConnectionDetails);
mockSession.SetupProperty(p => p.Session, new Session(null, testUri));
// start monitoring test session
profilerService.SessionMonitor.StartMonitoringSession(testUri, mockSession.Object);
// wait for polling to finish, or for timeout
System.Timers.Timer pollingTimer = new System.Timers.Timer();
pollingTimer.Interval = 10000;
pollingTimer.Start();
bool timeout = false;
pollingTimer.Elapsed += new System.Timers.ElapsedEventHandler((s_, e_) => {timeout = true;});
while (sessionStopped == false && !timeout)
{
Thread.Sleep(250);
}
pollingTimer.Stop();
// Call stop session to simulate when a server has stopped a session on its side.
profilerService.SessionMonitor.StopSession(0);
// check that a stopped session notification was sent
Assert.True(sessionStopped);

View File

@@ -123,41 +123,5 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
profilerSession.FilterOldEvents(profilerEvents);
Assert.False(profilerSession.EventsLost);
}
/// <summary>
/// Test the TryEnterPolling method
/// </summary>
[Test]
public void TestTryEnterPolling()
{
DateTime startTime = DateTime.Now;
// create new profiler session
var profilerSession = new ProfilerSession();
// enter the polling block
Assert.True(profilerSession.TryEnterPolling());
Assert.True(profilerSession.IsPolling);
// verify we can't enter again
Assert.False(profilerSession.TryEnterPolling());
// set polling to false to exit polling block
profilerSession.IsPolling = false;
bool outsideDelay = DateTime.Now.Subtract(startTime) >= profilerSession.PollingDelay;
// verify we can only enter again if we're outside polling delay interval
Assert.AreEqual(profilerSession.TryEnterPolling(), outsideDelay);
// reset IsPolling in case the delay has elasped on slow machine or while debugging
profilerSession.IsPolling = false;
// wait for the polling delay to elapse
Thread.Sleep(profilerSession.PollingDelay);
// verify we can enter the polling block again
Assert.True(profilerSession.TryEnterPolling());
}
}
}

View File

@@ -11,6 +11,7 @@ using System.Threading.Tasks;
using Microsoft.SqlServer.Management.XEvent;
using Microsoft.SqlTools.Hosting.Protocol;
using Microsoft.SqlTools.ServiceLayer.Connection;
using Microsoft.SqlTools.ServiceLayer.Connection.Contracts;
using Microsoft.SqlTools.ServiceLayer.Profiler;
using Microsoft.SqlTools.ServiceLayer.Profiler.Contracts;
using Microsoft.SqlTools.ServiceLayer.UnitTests.Utility;
@@ -206,6 +207,10 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
public int Id { get { return 51; } }
public ConnectionDetails ConnectionDetails { get; set; }
public Session Session { get; set; }
public void Start(){}
public void Stop(){}
@@ -291,6 +296,10 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
public int Id { get { return 1; } }
public ConnectionDetails ConnectionDetails { get; set; }
public Session Session { get; set; }
public void Start(){}
public void Stop(){}
@@ -382,6 +391,10 @@ namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
public int Id { get { return 2; } }
public ConnectionDetails ConnectionDetails { get; set; }
public Session Session { get; set; }
public void Start(){}
public void Stop(){}