mirror of
https://github.com/ckaczor/sqltoolsservice.git
synced 2026-01-30 09:35:38 -05:00
Add "Open XEL file" support to profiler in sqltoolsservice (#2091)
* Open XEL file changes * placeholders for openxel * add observable xe reader * md format tweaks * implement localfile as a new session type * add ErrorMessage to session stopped notice * fix flaky test * handle already running session * fix stopped session event send on file completion * fix flaky unit test * Update XElite and dependent versions * Fix errors after merge and remove failing tests for now * Fix main merge mess-up. Address comments. Add one more relevant test. * Remove extra namespace. * Remove unnecessary import * Fix build error * Address comments. * Remove disabiling JSON002 compiler warning * Address comments and update json handling * Fix build error * Fix integration test (emerged due to Main merge mess up) * Clean up code (no functional changes) --------- Co-authored-by: Karl Burtram <karlb@microsoft.com> Co-authored-by: shueybubbles <david.shiflet@microsoft.com>
This commit is contained in:
@@ -0,0 +1,170 @@
|
||||
//
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
//
|
||||
|
||||
#nullable disable
|
||||
|
||||
using Microsoft.SqlServer.XEvent.XELite;
|
||||
using Microsoft.SqlTools.ServiceLayer.Profiler.Contracts;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.SqlTools.ServiceLayer.Profiler
|
||||
{
|
||||
/// <summary>
|
||||
/// Wrapper XEventSession for IXEventFetcher instances
|
||||
/// </summary>
|
||||
class ObservableXEventSession : XEventSession, IObservableXEventSession
|
||||
{
|
||||
private readonly XeStreamObservable observableSession;
|
||||
private readonly SessionId sessionId;
|
||||
public IObservable<ProfilerEvent> ObservableSessionEvents => observableSession;
|
||||
|
||||
public override void Start()
|
||||
{
|
||||
Session?.Start();
|
||||
observableSession.Start();
|
||||
}
|
||||
|
||||
public override void Stop()
|
||||
{
|
||||
observableSession.Close();
|
||||
Session?.Stop();
|
||||
}
|
||||
|
||||
public ObservableXEventSession(Func<IXEventFetcher> xeventFetcher, SessionId sessionId)
|
||||
{
|
||||
observableSession = new XeStreamObservable(xeventFetcher);
|
||||
this.sessionId = sessionId;
|
||||
}
|
||||
|
||||
protected override SessionId GetSessionId()
|
||||
{
|
||||
return sessionId;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Source of ProfilerEvent push notifications. Wraps IXEventFetcher.
|
||||
/// </summary>
|
||||
public class XeStreamObservable : IObservable<ProfilerEvent>
|
||||
{
|
||||
private readonly object syncObj = new object();
|
||||
private readonly List<IObserver<ProfilerEvent>> observers = new List<IObserver<ProfilerEvent>>();
|
||||
private CancellationTokenSource cancellationTokenSource;
|
||||
private readonly Func<IXEventFetcher> xeventFetcher;
|
||||
|
||||
/// <summary>
|
||||
/// Constructs a new XeStreamObservable that converts xevent data from the fetcher to ProfilerEvent instances
|
||||
/// </summary>
|
||||
/// <param name="fetcher"></param>
|
||||
public XeStreamObservable(Func<IXEventFetcher> fetcher)
|
||||
{
|
||||
xeventFetcher = fetcher;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Starts processing xevents from the source.
|
||||
/// </summary>
|
||||
public void Start()
|
||||
{
|
||||
try
|
||||
{
|
||||
cancellationTokenSource = new CancellationTokenSource();
|
||||
var xeventFetcherFuncCallBack = xeventFetcher();
|
||||
var xeventFetcherTask = xeventFetcherFuncCallBack.ReadEventStream(OnEventRead, cancellationTokenSource.Token);
|
||||
xeventFetcherTask.ContinueWith(OnStreamClosed);
|
||||
} catch (Exception ex)
|
||||
{
|
||||
Task.FromException<IXEventFetcher>(ex).ContinueWith(OnStreamClosed);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stops the xevent fetching task and informs all listeners that the event stream has ended and clears the list of listeners.
|
||||
/// Start could be called again, but only new subscribers will see the data.
|
||||
/// </summary>
|
||||
public void Close()
|
||||
{
|
||||
cancellationTokenSource.Cancel();
|
||||
var currentObservers = CurrentObservers;
|
||||
currentObservers.ForEach(o => o.OnCompleted());
|
||||
lock (syncObj)
|
||||
{
|
||||
currentObservers.ForEach(o => observers.Remove(o));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds the observer to the listener list
|
||||
/// </summary>
|
||||
/// <param name="observer"></param>
|
||||
/// <returns>An IDisposable for the listener to call when it no longer wishes to receive events</returns>
|
||||
public IDisposable Subscribe(IObserver<ProfilerEvent> observer)
|
||||
{
|
||||
lock (syncObj)
|
||||
{
|
||||
if (!observers.Contains(observer))
|
||||
{
|
||||
observers.Add(observer);
|
||||
}
|
||||
return new Unsubscriber(observers, observer);
|
||||
}
|
||||
}
|
||||
|
||||
private List<IObserver<ProfilerEvent>> CurrentObservers
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (syncObj)
|
||||
{
|
||||
return new List<IObserver<ProfilerEvent>>(observers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void OnStreamClosed(Task fetcherTask)
|
||||
{
|
||||
if (fetcherTask.IsFaulted)
|
||||
{
|
||||
CurrentObservers.ForEach(o => o.OnError(fetcherTask.Exception));
|
||||
}
|
||||
Close();
|
||||
}
|
||||
|
||||
private Task OnEventRead(IXEvent xEvent)
|
||||
{
|
||||
ProfilerEvent profileEvent = new ProfilerEvent(xEvent.Name, xEvent.Timestamp.ToString());
|
||||
foreach (var kvp in xEvent.Fields)
|
||||
{
|
||||
profileEvent.Values.Add(kvp.Key, kvp.Value.ToString());
|
||||
}
|
||||
CurrentObservers.ForEach(o => o.OnNext(profileEvent));
|
||||
return Task.FromResult(0);
|
||||
}
|
||||
|
||||
private class Unsubscriber : IDisposable
|
||||
{
|
||||
private readonly List<IObserver<ProfilerEvent>> _observers;
|
||||
private readonly IObserver<ProfilerEvent> _observer;
|
||||
|
||||
public Unsubscriber(List<IObserver<ProfilerEvent>> observers, IObserver<ProfilerEvent> observer)
|
||||
{
|
||||
_observers = observers;
|
||||
_observer = observer;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_observer != null && _observers.Contains(_observer))
|
||||
{
|
||||
_observers.Remove(_observer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user