//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
#nullable disable
using System;
using System.Collections.Generic;
using System.IO;
using NUnit.Framework;
using System.Reflection;
using Microsoft.SqlTools.ServiceLayer.Profiler.Contracts;
using Microsoft.SqlTools.ServiceLayer.Profiler;
using System.Threading;
using System.Linq;
using Moq;
namespace Microsoft.SqlTools.ServiceLayer.UnitTests.Profiler
{
public class XeStreamObservableTests
{
///
/// this might technically be an integration test but putting it here because it doesn't require any connectivity.
///
[Test]
public void XeStreamObservable_reads_entire_xel_file()
{
var observer = InitializeFileObserver();
observer.Observable.Start();
var retries = 100;
while (!observer.Completed && retries-- > 0)
{
Thread.Sleep(100);
}
Assert.That(observer.Completed, Is.True, $"Reading the file didn't complete in 10 seconds. Events read: {observer.ProfilerEvents.Count}");
Assert.Multiple(() =>
{
Assert.That(observer.ProfilerEvents.Count, Is.EqualTo(149), "Number of events read");
Assert.That(observer.ProfilerEvents[0].Name, Is.EqualTo("rpc_completed"), "First event in the file");
Assert.That(observer.ProfilerEvents.Last().Name, Is.EqualTo("sql_batch_completed"), "Last event in the file");
});
}
[Test]
public void XeStreamObservable_calls_OnError_when_the_fetcher_fails()
{
var observer = InitializeFileObserver("thispathdoesnotexist.xel");
observer.Observable.Start();
var retries = 10;
while (!observer.Completed && retries-- > 0)
{
Thread.Sleep(100);
}
Assert.Multiple(() =>
{
Assert.That(observer.Completed, Is.True, $"Reading the missing file didn't complete in 1 second.");
Assert.That(observer.Error?.GetBaseException(), Is.InstanceOf(), $"Expected Error from missing file. Error:{observer.Error}");
});
}
private XeStreamObserver InitializeFileObserver(string filePath = null)
{
filePath ??= Path.Combine(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), "Profiler", "TestXel_0.xel");
var profilerService = SetupProfilerService(filePath);
var xeStreamObservable = new XeStreamObservable(() =>
{
return profilerService.initIXEventFetcher(filePath);
});
var observer = new XeStreamObserver() { Observable = xeStreamObservable };
xeStreamObservable.Subscribe(observer);
return observer;
}
private ProfilerService SetupProfilerService (string filePath = null)
{
filePath ??= Path.Combine(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), "Profiler", "TestXel_0.xel");
var sessionFactory = new Mock();
var profilerService = new ProfilerService() { XEventSessionFactory = sessionFactory.Object };
return profilerService;
}
}
sealed class XeStreamObserver : IObserver
{
public XeStreamObservable Observable { get; set; }
public readonly List ProfilerEvents = new List();
public bool Completed { get; private set; }
public Exception Error { get; private set; }
public void OnCompleted()
{
Completed = true;
}
public void OnError(Exception error)
{
Error = error;
}
public void OnNext(ProfilerEvent value)
{
ProfilerEvents.Add(value);
OnEventAdded?.Invoke(this, EventArgs.Empty);
}
public event EventHandler OnEventAdded;
}
}