//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.PowerShell.EditorServices.Utility
{
///
/// Provides a synchronized queue which can be used from within async
/// operations. This is primarily used for producer/consumer scenarios.
///
/// The type of item contained in the queue.
public class AsyncQueue
{
#region Private Fields
private AsyncLock queueLock = new AsyncLock();
private Queue itemQueue;
private Queue> requestQueue;
#endregion
#region Properties
///
/// Returns true if the queue is currently empty.
///
public bool IsEmpty { get; private set; }
#endregion
#region Constructors
///
/// Initializes an empty instance of the AsyncQueue class.
///
public AsyncQueue() : this(Enumerable.Empty())
{
}
///
/// Initializes an instance of the AsyncQueue class, pre-populated
/// with the given collection of items.
///
///
/// An IEnumerable containing the initial items with which the queue will
/// be populated.
///
public AsyncQueue(IEnumerable initialItems)
{
this.itemQueue = new Queue(initialItems);
this.requestQueue = new Queue>();
}
#endregion
#region Public Methods
///
/// Enqueues an item onto the end of the queue.
///
/// The item to be added to the queue.
///
/// A Task which can be awaited until the synchronized enqueue
/// operation completes.
///
public async Task EnqueueAsync(T item)
{
using (await queueLock.LockAsync())
{
TaskCompletionSource requestTaskSource = null;
// Are any requests waiting?
while (this.requestQueue.Count > 0)
{
// Is the next request cancelled already?
requestTaskSource = this.requestQueue.Dequeue();
if (!requestTaskSource.Task.IsCanceled)
{
// Dispatch the item
requestTaskSource.SetResult(item);
return;
}
}
// No more requests waiting, queue the item for a later request
this.itemQueue.Enqueue(item);
this.IsEmpty = false;
}
}
///
/// Dequeues an item from the queue or waits asynchronously
/// until an item is available.
///
///
/// A Task which can be awaited until a value can be dequeued.
///
public Task DequeueAsync()
{
return this.DequeueAsync(CancellationToken.None);
}
///
/// Dequeues an item from the queue or waits asynchronously
/// until an item is available. The wait can be cancelled
/// using the given CancellationToken.
///
///
/// A CancellationToken with which a dequeue wait can be cancelled.
///
///
/// A Task which can be awaited until a value can be dequeued.
///
public async Task DequeueAsync(CancellationToken cancellationToken)
{
Task requestTask;
using (await queueLock.LockAsync(cancellationToken))
{
if (this.itemQueue.Count > 0)
{
// Items are waiting to be taken so take one immediately
T item = this.itemQueue.Dequeue();
this.IsEmpty = this.itemQueue.Count == 0;
return item;
}
else
{
// Queue the request for the next item
var requestTaskSource = new TaskCompletionSource();
this.requestQueue.Enqueue(requestTaskSource);
// Register the wait task for cancel notifications
cancellationToken.Register(
() => requestTaskSource.TrySetCanceled());
requestTask = requestTaskSource.Task;
}
}
// Wait for the request task to complete outside of the lock
return await requestTask;
}
#endregion
}
}