//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
#nullable disable
using Microsoft.SqlServer.XEvent.XELite;
using Microsoft.SqlTools.ServiceLayer.Profiler.Contracts;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.SqlTools.ServiceLayer.Profiler
{
///
/// Wrapper XEventSession for IXEventFetcher instances
///
class ObservableXEventSession : XEventSession, IObservableXEventSession
{
private readonly XeStreamObservable observableSession;
private readonly SessionId sessionId;
public IObservable ObservableSessionEvents => observableSession;
public override void Start()
{
Session?.Start();
observableSession.Start();
}
public override void Stop()
{
observableSession.Close();
Session?.Stop();
}
public ObservableXEventSession(Func xeventFetcher, SessionId sessionId)
{
observableSession = new XeStreamObservable(xeventFetcher);
this.sessionId = sessionId;
}
protected override SessionId GetSessionId()
{
return sessionId;
}
}
///
/// Source of ProfilerEvent push notifications. Wraps IXEventFetcher.
///
public class XeStreamObservable : IObservable
{
private readonly object syncObj = new object();
private readonly List> observers = new List>();
private CancellationTokenSource cancellationTokenSource;
private readonly Func xeventFetcher;
///
/// Constructs a new XeStreamObservable that converts xevent data from the fetcher to ProfilerEvent instances
///
///
public XeStreamObservable(Func fetcher)
{
xeventFetcher = fetcher;
}
///
/// Starts processing xevents from the source.
///
public void Start()
{
try
{
cancellationTokenSource = new CancellationTokenSource();
var xeventFetcherFuncCallBack = xeventFetcher();
var xeventFetcherTask = xeventFetcherFuncCallBack.ReadEventStream(OnEventRead, cancellationTokenSource.Token);
xeventFetcherTask.ContinueWith(OnStreamClosed);
} catch (Exception ex)
{
Task.FromException(ex).ContinueWith(OnStreamClosed);
}
}
///
/// Stops the xevent fetching task and informs all listeners that the event stream has ended and clears the list of listeners.
/// Start could be called again, but only new subscribers will see the data.
///
public void Close()
{
cancellationTokenSource.Cancel();
var currentObservers = CurrentObservers;
currentObservers.ForEach(o => o.OnCompleted());
lock (syncObj)
{
currentObservers.ForEach(o => observers.Remove(o));
}
}
///
/// Adds the observer to the listener list
///
///
/// An IDisposable for the listener to call when it no longer wishes to receive events
public IDisposable Subscribe(IObserver observer)
{
lock (syncObj)
{
if (!observers.Contains(observer))
{
observers.Add(observer);
}
return new Unsubscriber(observers, observer);
}
}
private List> CurrentObservers
{
get
{
lock (syncObj)
{
return new List>(observers);
}
}
}
private void OnStreamClosed(Task fetcherTask)
{
if (fetcherTask.IsFaulted)
{
CurrentObservers.ForEach(o => o.OnError(fetcherTask.Exception));
}
Close();
}
private Task OnEventRead(IXEvent xEvent)
{
ProfilerEvent profileEvent = new ProfilerEvent(xEvent.Name, xEvent.Timestamp.ToString());
foreach (var kvp in xEvent.Fields)
{
profileEvent.Values.Add(kvp.Key, kvp.Value.ToString());
}
CurrentObservers.ForEach(o => o.OnNext(profileEvent));
return Task.FromResult(0);
}
private class Unsubscriber : IDisposable
{
private readonly List> _observers;
private readonly IObserver _observer;
public Unsubscriber(List> observers, IObserver observer)
{
_observers = observers;
_observer = observer;
}
public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
{
_observers.Remove(_observer);
}
}
}
}
}