mirror of
https://github.com/ckaczor/azuredatastudio.git
synced 2026-03-29 00:00:29 -04:00
Merge from vscode 966b87dd4013be1a9c06e2b8334522ec61905cc2 (#4696)
This commit is contained in:
774
src/vs/base/parts/ipc/common/ipc.net.ts
Normal file
774
src/vs/base/parts/ipc/common/ipc.net.ts
Normal file
@@ -0,0 +1,774 @@
|
||||
/*---------------------------------------------------------------------------------------------
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import { Event, Emitter } from 'vs/base/common/event';
|
||||
import { IMessagePassingProtocol, IPCClient } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { IDisposable, Disposable, dispose } from 'vs/base/common/lifecycle';
|
||||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
import * as platform from 'vs/base/common/platform';
|
||||
|
||||
declare var process: any;
|
||||
|
||||
export interface ISocket {
|
||||
onData(listener: (e: VSBuffer) => void): IDisposable;
|
||||
onClose(listener: () => void): IDisposable;
|
||||
onEnd(listener: () => void): IDisposable;
|
||||
write(buffer: VSBuffer): void;
|
||||
end(): void;
|
||||
}
|
||||
|
||||
let emptyBuffer: VSBuffer | null = null;
|
||||
function getEmptyBuffer(): VSBuffer {
|
||||
if (!emptyBuffer) {
|
||||
emptyBuffer = VSBuffer.alloc(0);
|
||||
}
|
||||
return emptyBuffer;
|
||||
}
|
||||
|
||||
class ChunkStream {
|
||||
|
||||
private _chunks: VSBuffer[];
|
||||
private _totalLength: number;
|
||||
|
||||
public get byteLength() {
|
||||
return this._totalLength;
|
||||
}
|
||||
|
||||
constructor() {
|
||||
this._chunks = [];
|
||||
this._totalLength = 0;
|
||||
}
|
||||
|
||||
public acceptChunk(buff: VSBuffer) {
|
||||
this._chunks.push(buff);
|
||||
this._totalLength += buff.byteLength;
|
||||
}
|
||||
|
||||
public read(byteCount: number): VSBuffer {
|
||||
if (byteCount === 0) {
|
||||
return getEmptyBuffer();
|
||||
}
|
||||
|
||||
if (byteCount > this._totalLength) {
|
||||
throw new Error(`Cannot read so many bytes!`);
|
||||
}
|
||||
|
||||
if (this._chunks[0].byteLength === byteCount) {
|
||||
// super fast path, precisely first chunk must be returned
|
||||
const result = this._chunks.shift()!;
|
||||
this._totalLength -= byteCount;
|
||||
return result;
|
||||
}
|
||||
|
||||
if (this._chunks[0].byteLength > byteCount) {
|
||||
// fast path, the reading is entirely within the first chunk
|
||||
const result = this._chunks[0].slice(0, byteCount);
|
||||
this._chunks[0] = this._chunks[0].slice(byteCount);
|
||||
this._totalLength -= byteCount;
|
||||
return result;
|
||||
}
|
||||
|
||||
let result = VSBuffer.alloc(byteCount);
|
||||
let resultOffset = 0;
|
||||
while (byteCount > 0) {
|
||||
const chunk = this._chunks[0];
|
||||
if (chunk.byteLength > byteCount) {
|
||||
// this chunk will survive
|
||||
this._chunks[0] = chunk.slice(byteCount);
|
||||
|
||||
const chunkPart = chunk.slice(0, byteCount);
|
||||
result.set(chunkPart, resultOffset);
|
||||
resultOffset += byteCount;
|
||||
this._totalLength -= byteCount;
|
||||
byteCount -= byteCount;
|
||||
} else {
|
||||
// this chunk will be entirely read
|
||||
this._chunks.shift();
|
||||
|
||||
result.set(chunk, resultOffset);
|
||||
resultOffset += chunk.byteLength;
|
||||
this._totalLength -= chunk.byteLength;
|
||||
byteCount -= chunk.byteLength;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
const enum ProtocolMessageType {
|
||||
None = 0,
|
||||
Regular = 1,
|
||||
Control = 2,
|
||||
Ack = 3,
|
||||
KeepAlive = 4
|
||||
}
|
||||
|
||||
export const enum ProtocolConstants {
|
||||
HeaderLength = 13,
|
||||
/**
|
||||
* Send an Acknowledge message at most 2 seconds later...
|
||||
*/
|
||||
AcknowledgeTime = 2000, // 2 seconds
|
||||
/**
|
||||
* If there is a message that has been unacknowledged for 10 seconds, consider the connection closed...
|
||||
*/
|
||||
AcknowledgeTimeoutTime = 10000, // 10 seconds
|
||||
/**
|
||||
* Send at least a message every 30s for keep alive reasons.
|
||||
*/
|
||||
KeepAliveTime = 30000, // 30 seconds
|
||||
/**
|
||||
* If there is no message received for 60 seconds, consider the connection closed...
|
||||
*/
|
||||
KeepAliveTimeoutTime = 60000, // 60 seconds
|
||||
/**
|
||||
* If there is no reconnection within this time-frame, consider the connection permanently closed...
|
||||
*/
|
||||
ReconnectionGraceTime = 60 * 60 * 1000, // 1hr
|
||||
}
|
||||
|
||||
class ProtocolMessage {
|
||||
|
||||
public writtenTime: number;
|
||||
|
||||
constructor(
|
||||
public readonly type: ProtocolMessageType,
|
||||
public readonly id: number,
|
||||
public readonly ack: number,
|
||||
public readonly data: VSBuffer
|
||||
) {
|
||||
this.writtenTime = 0;
|
||||
}
|
||||
|
||||
public get size(): number {
|
||||
return this.data.byteLength;
|
||||
}
|
||||
}
|
||||
|
||||
class ProtocolReader extends Disposable {
|
||||
|
||||
private readonly _socket: ISocket;
|
||||
private _isDisposed: boolean;
|
||||
private readonly _incomingData: ChunkStream;
|
||||
public lastReadTime: number;
|
||||
|
||||
private readonly _onMessage = new Emitter<ProtocolMessage>();
|
||||
public readonly onMessage: Event<ProtocolMessage> = this._onMessage.event;
|
||||
|
||||
private readonly _state = {
|
||||
readHead: true,
|
||||
readLen: ProtocolConstants.HeaderLength,
|
||||
messageType: ProtocolMessageType.None,
|
||||
id: 0,
|
||||
ack: 0
|
||||
};
|
||||
|
||||
constructor(socket: ISocket) {
|
||||
super();
|
||||
this._socket = socket;
|
||||
this._isDisposed = false;
|
||||
this._incomingData = new ChunkStream();
|
||||
this._register(this._socket.onData(data => this.acceptChunk(data)));
|
||||
this.lastReadTime = Date.now();
|
||||
}
|
||||
|
||||
public acceptChunk(data: VSBuffer | null): void {
|
||||
if (!data || data.byteLength === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.lastReadTime = Date.now();
|
||||
|
||||
this._incomingData.acceptChunk(data);
|
||||
|
||||
while (this._incomingData.byteLength >= this._state.readLen) {
|
||||
|
||||
const buff = this._incomingData.read(this._state.readLen);
|
||||
|
||||
if (this._state.readHead) {
|
||||
// buff is the header
|
||||
|
||||
// save new state => next time will read the body
|
||||
this._state.readHead = false;
|
||||
this._state.readLen = buff.readUint32BE(9);
|
||||
this._state.messageType = <ProtocolMessageType>buff.readUint8(0);
|
||||
this._state.id = buff.readUint32BE(1);
|
||||
this._state.ack = buff.readUint32BE(5);
|
||||
} else {
|
||||
// buff is the body
|
||||
const messageType = this._state.messageType;
|
||||
const id = this._state.id;
|
||||
const ack = this._state.ack;
|
||||
|
||||
// save new state => next time will read the header
|
||||
this._state.readHead = true;
|
||||
this._state.readLen = ProtocolConstants.HeaderLength;
|
||||
this._state.messageType = ProtocolMessageType.None;
|
||||
this._state.id = 0;
|
||||
this._state.ack = 0;
|
||||
|
||||
this._onMessage.fire(new ProtocolMessage(messageType, id, ack, buff));
|
||||
|
||||
if (this._isDisposed) {
|
||||
// check if an event listener lead to our disposal
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public readEntireBuffer(): VSBuffer {
|
||||
return this._incomingData.read(this._incomingData.byteLength);
|
||||
}
|
||||
|
||||
public dispose(): void {
|
||||
this._isDisposed = true;
|
||||
super.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
class ProtocolWriter {
|
||||
|
||||
private _isDisposed: boolean;
|
||||
private readonly _socket: ISocket;
|
||||
private _data: VSBuffer[];
|
||||
private _totalLength: number;
|
||||
public lastWriteTime: number;
|
||||
|
||||
constructor(socket: ISocket) {
|
||||
this._isDisposed = false;
|
||||
this._socket = socket;
|
||||
this._data = [];
|
||||
this._totalLength = 0;
|
||||
this.lastWriteTime = 0;
|
||||
}
|
||||
|
||||
public dispose(): void {
|
||||
this.flush();
|
||||
this._isDisposed = true;
|
||||
}
|
||||
|
||||
public flush(): void {
|
||||
// flush
|
||||
this._writeNow();
|
||||
}
|
||||
|
||||
public write(msg: ProtocolMessage) {
|
||||
if (this._isDisposed) {
|
||||
console.warn(`Cannot write message in a disposed ProtocolWriter`);
|
||||
console.warn(msg);
|
||||
return;
|
||||
}
|
||||
msg.writtenTime = Date.now();
|
||||
this.lastWriteTime = Date.now();
|
||||
const header = VSBuffer.alloc(ProtocolConstants.HeaderLength);
|
||||
header.writeUint8(msg.type, 0);
|
||||
header.writeUint32BE(msg.id, 1);
|
||||
header.writeUint32BE(msg.ack, 5);
|
||||
header.writeUint32BE(msg.data.byteLength, 9);
|
||||
this._writeSoon(header, msg.data);
|
||||
}
|
||||
|
||||
private _bufferAdd(head: VSBuffer, body: VSBuffer): boolean {
|
||||
const wasEmpty = this._totalLength === 0;
|
||||
this._data.push(head, body);
|
||||
this._totalLength += head.byteLength + body.byteLength;
|
||||
return wasEmpty;
|
||||
}
|
||||
|
||||
private _bufferTake(): VSBuffer {
|
||||
const ret = VSBuffer.concat(this._data, this._totalLength);
|
||||
this._data.length = 0;
|
||||
this._totalLength = 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
private _writeSoon(header: VSBuffer, data: VSBuffer): void {
|
||||
if (this._bufferAdd(header, data)) {
|
||||
platform.setImmediate(() => {
|
||||
this._writeNow();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private _writeNow(): void {
|
||||
if (this._totalLength === 0) {
|
||||
return;
|
||||
}
|
||||
this._socket.write(this._bufferTake());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has the following format:
|
||||
* ```
|
||||
* /-------------------------------|------\
|
||||
* | HEADER | |
|
||||
* |-------------------------------| DATA |
|
||||
* | TYPE | ID | ACK | DATA_LENGTH | |
|
||||
* \-------------------------------|------/
|
||||
* ```
|
||||
* The header is 9 bytes and consists of:
|
||||
* - TYPE is 1 byte (ProtocolMessageType) - the message type
|
||||
* - ID is 4 bytes (u32be) - the message id (can be 0 to indicate to be ignored)
|
||||
* - ACK is 4 bytes (u32be) - the acknowledged message id (can be 0 to indicate to be ignored)
|
||||
* - DATA_LENGTH is 4 bytes (u32be) - the length in bytes of DATA
|
||||
*
|
||||
* Only Regular messages are counted, other messages are not counted, nor acknowledged.
|
||||
*/
|
||||
export class Protocol extends Disposable implements IMessagePassingProtocol {
|
||||
|
||||
private _socket: ISocket;
|
||||
private _socketWriter: ProtocolWriter;
|
||||
private _socketReader: ProtocolReader;
|
||||
|
||||
private _onMessage = new Emitter<VSBuffer>();
|
||||
readonly onMessage: Event<VSBuffer> = this._onMessage.event;
|
||||
|
||||
private _onClose = new Emitter<void>();
|
||||
readonly onClose: Event<void> = this._onClose.event;
|
||||
|
||||
constructor(socket: ISocket) {
|
||||
super();
|
||||
this._socket = socket;
|
||||
this._socketWriter = this._register(new ProtocolWriter(this._socket));
|
||||
this._socketReader = this._register(new ProtocolReader(this._socket));
|
||||
|
||||
this._register(this._socketReader.onMessage((msg) => {
|
||||
if (msg.type === ProtocolMessageType.Regular) {
|
||||
this._onMessage.fire(msg.data);
|
||||
}
|
||||
}));
|
||||
|
||||
this._register(this._socket.onClose(() => this._onClose.fire()));
|
||||
}
|
||||
|
||||
getSocket(): ISocket {
|
||||
return this._socket;
|
||||
}
|
||||
|
||||
send(buffer: VSBuffer): void {
|
||||
this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.Regular, 0, 0, buffer));
|
||||
}
|
||||
}
|
||||
|
||||
export class Client<TContext = string> extends IPCClient<TContext> {
|
||||
|
||||
static fromSocket<TContext = string>(socket: ISocket, id: TContext): Client<TContext> {
|
||||
return new Client(new Protocol(socket), id);
|
||||
}
|
||||
|
||||
get onClose(): Event<void> { return this.protocol.onClose; }
|
||||
|
||||
constructor(private protocol: Protocol | PersistentProtocol, id: TContext) {
|
||||
super(protocol, id);
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
super.dispose();
|
||||
const socket = this.protocol.getSocket();
|
||||
this.protocol.dispose();
|
||||
socket.end();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will ensure no messages are lost if there are no event listeners.
|
||||
*/
|
||||
function createBufferedEvent<T>(source: Event<T>): Event<T> {
|
||||
let emitter: Emitter<T>;
|
||||
let hasListeners = false;
|
||||
let isDeliveringMessages = false;
|
||||
let bufferedMessages: T[] = [];
|
||||
|
||||
const deliverMessages = () => {
|
||||
if (isDeliveringMessages) {
|
||||
return;
|
||||
}
|
||||
isDeliveringMessages = true;
|
||||
while (hasListeners && bufferedMessages.length > 0) {
|
||||
emitter.fire(bufferedMessages.shift()!);
|
||||
}
|
||||
isDeliveringMessages = false;
|
||||
};
|
||||
|
||||
source((e: T) => {
|
||||
bufferedMessages.push(e);
|
||||
deliverMessages();
|
||||
});
|
||||
|
||||
emitter = new Emitter<T>({
|
||||
onFirstListenerAdd: () => {
|
||||
hasListeners = true;
|
||||
// it is important to deliver these messages after this call, but before
|
||||
// other messages have a chance to be received (to guarantee in order delivery)
|
||||
// that's why we're using here nextTick and not other types of timeouts
|
||||
if (typeof process !== 'undefined') {
|
||||
process.nextTick(deliverMessages);
|
||||
} else {
|
||||
platform.setImmediate(deliverMessages);
|
||||
}
|
||||
},
|
||||
onLastListenerRemove: () => {
|
||||
hasListeners = false;
|
||||
}
|
||||
});
|
||||
|
||||
return emitter.event;
|
||||
}
|
||||
|
||||
class QueueElement<T> {
|
||||
public readonly data: T;
|
||||
public next: QueueElement<T> | null;
|
||||
|
||||
constructor(data: T) {
|
||||
this.data = data;
|
||||
this.next = null;
|
||||
}
|
||||
}
|
||||
|
||||
class Queue<T> {
|
||||
|
||||
private _first: QueueElement<T> | null;
|
||||
private _last: QueueElement<T> | null;
|
||||
|
||||
constructor() {
|
||||
this._first = null;
|
||||
this._last = null;
|
||||
}
|
||||
|
||||
public peek(): T | null {
|
||||
if (!this._first) {
|
||||
return null;
|
||||
}
|
||||
return this._first.data;
|
||||
}
|
||||
|
||||
public toArray(): T[] {
|
||||
let result: T[] = [], resultLen = 0;
|
||||
let it = this._first;
|
||||
while (it) {
|
||||
result[resultLen++] = it.data;
|
||||
it = it.next;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public pop(): void {
|
||||
if (!this._first) {
|
||||
return;
|
||||
}
|
||||
if (this._first === this._last) {
|
||||
this._first = null;
|
||||
this._last = null;
|
||||
return;
|
||||
}
|
||||
this._first = this._first.next;
|
||||
}
|
||||
|
||||
public push(item: T): void {
|
||||
const element = new QueueElement(item);
|
||||
if (!this._first) {
|
||||
this._first = element;
|
||||
this._last = element;
|
||||
return;
|
||||
}
|
||||
this._last!.next = element;
|
||||
this._last = element;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as Protocol, but will actually track messages and acks.
|
||||
* Moreover, it will ensure no messages are lost if there are no event listeners.
|
||||
*/
|
||||
export class PersistentProtocol {
|
||||
|
||||
private _isReconnecting: boolean;
|
||||
|
||||
private _outgoingUnackMsg: Queue<ProtocolMessage>;
|
||||
private _outgoingMsgId: number;
|
||||
private _outgoingAckId: number;
|
||||
private _outgoingAckTimeout: any | null;
|
||||
|
||||
private _incomingMsgId: number;
|
||||
private _incomingAckId: number;
|
||||
private _incomingMsgLastTime: number;
|
||||
private _incomingAckTimeout: any | null;
|
||||
|
||||
private _outgoingKeepAliveTimeout: any | null;
|
||||
private _incomingKeepAliveTimeout: any | null;
|
||||
|
||||
private _socket: ISocket;
|
||||
private _socketWriter: ProtocolWriter;
|
||||
private _socketReader: ProtocolReader;
|
||||
private _socketDisposables: IDisposable[];
|
||||
|
||||
private _onControlMessage = new Emitter<VSBuffer>();
|
||||
readonly onControlMessage: Event<VSBuffer> = createBufferedEvent(this._onControlMessage.event);
|
||||
|
||||
private _onMessage = new Emitter<VSBuffer>();
|
||||
readonly onMessage: Event<VSBuffer> = createBufferedEvent(this._onMessage.event);
|
||||
|
||||
private _onClose = new Emitter<void>();
|
||||
readonly onClose: Event<void> = createBufferedEvent(this._onClose.event);
|
||||
|
||||
private _onSocketClose = new Emitter<void>();
|
||||
readonly onSocketClose: Event<void> = createBufferedEvent(this._onSocketClose.event);
|
||||
|
||||
private _onSocketTimeout = new Emitter<void>();
|
||||
readonly onSocketTimeout: Event<void> = createBufferedEvent(this._onSocketTimeout.event);
|
||||
|
||||
public get unacknowledgedCount(): number {
|
||||
return this._outgoingMsgId - this._outgoingAckId;
|
||||
}
|
||||
|
||||
constructor(socket: ISocket, initialChunk: VSBuffer | null = null) {
|
||||
this._isReconnecting = false;
|
||||
this._outgoingUnackMsg = new Queue<ProtocolMessage>();
|
||||
this._outgoingMsgId = 0;
|
||||
this._outgoingAckId = 0;
|
||||
this._outgoingAckTimeout = null;
|
||||
|
||||
this._incomingMsgId = 0;
|
||||
this._incomingAckId = 0;
|
||||
this._incomingMsgLastTime = 0;
|
||||
this._incomingAckTimeout = null;
|
||||
|
||||
this._outgoingKeepAliveTimeout = null;
|
||||
this._incomingKeepAliveTimeout = null;
|
||||
|
||||
this._socketDisposables = [];
|
||||
this._socket = socket;
|
||||
this._socketWriter = new ProtocolWriter(this._socket);
|
||||
this._socketDisposables.push(this._socketWriter);
|
||||
this._socketReader = new ProtocolReader(this._socket);
|
||||
this._socketDisposables.push(this._socketReader);
|
||||
this._socketDisposables.push(this._socketReader.onMessage(msg => this._receiveMessage(msg)));
|
||||
this._socketDisposables.push(this._socket.onClose(() => this._onSocketClose.fire()));
|
||||
this._socketDisposables.push(this._socket.onEnd(() => this._onClose.fire()));
|
||||
if (initialChunk) {
|
||||
this._socketReader.acceptChunk(initialChunk);
|
||||
}
|
||||
|
||||
this._sendKeepAliveCheck();
|
||||
this._recvKeepAliveCheck();
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
if (this._outgoingAckTimeout) {
|
||||
clearTimeout(this._outgoingAckTimeout);
|
||||
this._outgoingAckTimeout = null;
|
||||
}
|
||||
if (this._incomingAckTimeout) {
|
||||
clearTimeout(this._incomingAckTimeout);
|
||||
this._incomingAckTimeout = null;
|
||||
}
|
||||
if (this._outgoingKeepAliveTimeout) {
|
||||
clearTimeout(this._outgoingKeepAliveTimeout);
|
||||
this._outgoingKeepAliveTimeout = null;
|
||||
}
|
||||
if (this._incomingKeepAliveTimeout) {
|
||||
clearTimeout(this._incomingKeepAliveTimeout);
|
||||
this._incomingKeepAliveTimeout = null;
|
||||
}
|
||||
this._socketDisposables = dispose(this._socketDisposables);
|
||||
}
|
||||
|
||||
private _sendKeepAliveCheck(): void {
|
||||
if (this._outgoingKeepAliveTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const timeSinceLastOutgoingMsg = Date.now() - this._socketWriter.lastWriteTime;
|
||||
if (timeSinceLastOutgoingMsg >= ProtocolConstants.KeepAliveTime) {
|
||||
// sufficient time has passed since last message was written,
|
||||
// and no message from our side needed to be sent in the meantime,
|
||||
// so we will send a message containing only a keep alive.
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, 0, getEmptyBuffer());
|
||||
this._socketWriter.write(msg);
|
||||
this._sendKeepAliveCheck();
|
||||
return;
|
||||
}
|
||||
|
||||
this._outgoingKeepAliveTimeout = setTimeout(() => {
|
||||
this._outgoingKeepAliveTimeout = null;
|
||||
this._sendKeepAliveCheck();
|
||||
}, ProtocolConstants.KeepAliveTime - timeSinceLastOutgoingMsg + 5);
|
||||
}
|
||||
|
||||
private _recvKeepAliveCheck(): void {
|
||||
if (this._incomingKeepAliveTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const timeSinceLastIncomingMsg = Date.now() - this._socketReader.lastReadTime;
|
||||
if (timeSinceLastIncomingMsg >= ProtocolConstants.KeepAliveTimeoutTime) {
|
||||
// Trash the socket
|
||||
this._onSocketTimeout.fire(undefined);
|
||||
return;
|
||||
}
|
||||
|
||||
this._incomingKeepAliveTimeout = setTimeout(() => {
|
||||
this._incomingKeepAliveTimeout = null;
|
||||
this._recvKeepAliveCheck();
|
||||
}, ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg + 5);
|
||||
}
|
||||
|
||||
public getSocket(): ISocket {
|
||||
return this._socket;
|
||||
}
|
||||
|
||||
public beginAcceptReconnection(socket: ISocket, initialDataChunk: VSBuffer | null): void {
|
||||
this._isReconnecting = true;
|
||||
|
||||
this._socketDisposables = dispose(this._socketDisposables);
|
||||
|
||||
this._socket = socket;
|
||||
this._socketWriter = new ProtocolWriter(this._socket);
|
||||
this._socketDisposables.push(this._socketWriter);
|
||||
this._socketReader = new ProtocolReader(this._socket);
|
||||
this._socketDisposables.push(this._socketReader);
|
||||
this._socketDisposables.push(this._socketReader.onMessage(msg => this._receiveMessage(msg)));
|
||||
this._socketDisposables.push(this._socket.onClose(() => this._onSocketClose.fire()));
|
||||
this._socketDisposables.push(this._socket.onEnd(() => this._onClose.fire()));
|
||||
this._socketReader.acceptChunk(initialDataChunk);
|
||||
}
|
||||
|
||||
public endAcceptReconnection(): void {
|
||||
this._isReconnecting = false;
|
||||
|
||||
// Send again all unacknowledged messages
|
||||
const toSend = this._outgoingUnackMsg.toArray();
|
||||
for (let i = 0, len = toSend.length; i < len; i++) {
|
||||
this._socketWriter.write(toSend[i]);
|
||||
}
|
||||
this._recvAckCheck();
|
||||
|
||||
this._sendKeepAliveCheck();
|
||||
this._recvKeepAliveCheck();
|
||||
}
|
||||
|
||||
private _receiveMessage(msg: ProtocolMessage): void {
|
||||
if (msg.ack > this._outgoingAckId) {
|
||||
this._outgoingAckId = msg.ack;
|
||||
do {
|
||||
const first = this._outgoingUnackMsg.peek();
|
||||
if (first && first.id <= msg.ack) {
|
||||
// this message has been confirmed, remove it
|
||||
this._outgoingUnackMsg.pop();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
|
||||
if (msg.type === ProtocolMessageType.Regular) {
|
||||
if (msg.id > this._incomingMsgId) {
|
||||
if (msg.id !== this._incomingMsgId + 1) {
|
||||
console.error(`PROTOCOL CORRUPTION, LAST SAW MSG ${this._incomingMsgId} AND HAVE NOW RECEIVED MSG ${msg.id}`);
|
||||
}
|
||||
this._incomingMsgId = msg.id;
|
||||
this._incomingMsgLastTime = Date.now();
|
||||
this._sendAckCheck();
|
||||
this._onMessage.fire(msg.data);
|
||||
}
|
||||
} else if (msg.type === ProtocolMessageType.Control) {
|
||||
this._onControlMessage.fire(msg.data);
|
||||
}
|
||||
}
|
||||
|
||||
readEntireBuffer(): VSBuffer {
|
||||
return this._socketReader.readEntireBuffer();
|
||||
}
|
||||
|
||||
flush(): void {
|
||||
this._socketWriter.flush();
|
||||
}
|
||||
|
||||
send(buffer: VSBuffer): void {
|
||||
const myId = ++this._outgoingMsgId;
|
||||
this._incomingAckId = this._incomingMsgId;
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.Regular, myId, this._incomingAckId, buffer);
|
||||
this._outgoingUnackMsg.push(msg);
|
||||
if (!this._isReconnecting) {
|
||||
this._socketWriter.write(msg);
|
||||
this._recvAckCheck();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message which will not be part of the regular acknowledge flow.
|
||||
* Use this for early control messages which are repeated in case of reconnection.
|
||||
*/
|
||||
sendControl(buffer: VSBuffer): void {
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.Control, 0, 0, buffer);
|
||||
this._socketWriter.write(msg);
|
||||
}
|
||||
|
||||
private _sendAckCheck(): void {
|
||||
if (this._incomingMsgId <= this._incomingAckId) {
|
||||
// nothink to acknowledge
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._incomingAckTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const timeSinceLastIncomingMsg = Date.now() - this._incomingMsgLastTime;
|
||||
if (timeSinceLastIncomingMsg >= ProtocolConstants.AcknowledgeTime) {
|
||||
// sufficient time has passed since this message has been received,
|
||||
// and no message from our side needed to be sent in the meantime,
|
||||
// so we will send a message containing only an ack.
|
||||
this._sendAck();
|
||||
return;
|
||||
}
|
||||
|
||||
this._incomingAckTimeout = setTimeout(() => {
|
||||
this._incomingAckTimeout = null;
|
||||
this._sendAckCheck();
|
||||
}, ProtocolConstants.AcknowledgeTime - timeSinceLastIncomingMsg + 5);
|
||||
}
|
||||
|
||||
private _recvAckCheck(): void {
|
||||
if (this._outgoingMsgId <= this._outgoingAckId) {
|
||||
// everything has been acknowledged
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._outgoingAckTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const oldestUnacknowledgedMsg = this._outgoingUnackMsg.peek()!;
|
||||
const timeSinceOldestUnacknowledgedMsg = Date.now() - oldestUnacknowledgedMsg.writtenTime;
|
||||
if (timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.AcknowledgeTimeoutTime) {
|
||||
// Trash the socket
|
||||
this._onSocketTimeout.fire(undefined);
|
||||
return;
|
||||
}
|
||||
|
||||
this._outgoingAckTimeout = setTimeout(() => {
|
||||
this._outgoingAckTimeout = null;
|
||||
this._recvAckCheck();
|
||||
}, ProtocolConstants.AcknowledgeTimeoutTime - timeSinceOldestUnacknowledgedMsg + 5);
|
||||
}
|
||||
|
||||
private _sendAck(): void {
|
||||
if (this._incomingMsgId <= this._incomingAckId) {
|
||||
// nothink to acknowledge
|
||||
return;
|
||||
}
|
||||
|
||||
this._incomingAckId = this._incomingMsgId;
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.Ack, 0, this._incomingAckId, getEmptyBuffer());
|
||||
this._socketWriter.write(msg);
|
||||
}
|
||||
}
|
||||
@@ -3,8 +3,13 @@
|
||||
* Licensed under the Source EULA. See License.txt in the project root for license information.
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import { CancellationToken } from 'vs/base/common/cancellation';
|
||||
import { Event } from 'vs/base/common/event';
|
||||
import { Event, Emitter, Relay } from 'vs/base/common/event';
|
||||
import { IDisposable, toDisposable, combinedDisposable } from 'vs/base/common/lifecycle';
|
||||
import { CancelablePromise, createCancelablePromise, timeout } from 'vs/base/common/async';
|
||||
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
|
||||
import * as errors from 'vs/base/common/errors';
|
||||
import { IServerChannel, IChannel } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
|
||||
/**
|
||||
* An `IChannel` is an abstraction over a collection of commands.
|
||||
@@ -26,3 +31,743 @@ export interface IServerChannel<TContext = string> {
|
||||
call<T>(ctx: TContext, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>;
|
||||
listen<T>(ctx: TContext, event: string, arg?: any): Event<T>;
|
||||
}
|
||||
|
||||
|
||||
export const enum RequestType {
|
||||
Promise = 100,
|
||||
PromiseCancel = 101,
|
||||
EventListen = 102,
|
||||
EventDispose = 103
|
||||
}
|
||||
|
||||
type IRawPromiseRequest = { type: RequestType.Promise; id: number; channelName: string; name: string; arg: any; };
|
||||
type IRawPromiseCancelRequest = { type: RequestType.PromiseCancel, id: number };
|
||||
type IRawEventListenRequest = { type: RequestType.EventListen; id: number; channelName: string; name: string; arg: any; };
|
||||
type IRawEventDisposeRequest = { type: RequestType.EventDispose, id: number };
|
||||
type IRawRequest = IRawPromiseRequest | IRawPromiseCancelRequest | IRawEventListenRequest | IRawEventDisposeRequest;
|
||||
|
||||
export const enum ResponseType {
|
||||
Initialize = 200,
|
||||
PromiseSuccess = 201,
|
||||
PromiseError = 202,
|
||||
PromiseErrorObj = 203,
|
||||
EventFire = 204
|
||||
}
|
||||
|
||||
type IRawInitializeResponse = { type: ResponseType.Initialize };
|
||||
type IRawPromiseSuccessResponse = { type: ResponseType.PromiseSuccess; id: number; data: any };
|
||||
type IRawPromiseErrorResponse = { type: ResponseType.PromiseError; id: number; data: { message: string, name: string, stack: string[] | undefined } };
|
||||
type IRawPromiseErrorObjResponse = { type: ResponseType.PromiseErrorObj; id: number; data: any };
|
||||
type IRawEventFireResponse = { type: ResponseType.EventFire; id: number; data: any };
|
||||
type IRawResponse = IRawInitializeResponse | IRawPromiseSuccessResponse | IRawPromiseErrorResponse | IRawPromiseErrorObjResponse | IRawEventFireResponse;
|
||||
|
||||
interface IHandler {
|
||||
(response: IRawResponse): void;
|
||||
}
|
||||
|
||||
export interface IMessagePassingProtocol {
|
||||
send(buffer: VSBuffer): void;
|
||||
onMessage: Event<VSBuffer>;
|
||||
}
|
||||
|
||||
enum State {
|
||||
Uninitialized,
|
||||
Idle
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IChannelServer` hosts a collection of channels. You are
|
||||
* able to register channels onto it, provided a channel name.
|
||||
*/
|
||||
export interface IChannelServer<TContext = string> {
|
||||
registerChannel(channelName: string, channel: IServerChannel<TContext>): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IChannelClient` has access to a collection of channels. You
|
||||
* are able to get those channels, given their channel name.
|
||||
*/
|
||||
export interface IChannelClient {
|
||||
getChannel<T extends IChannel>(channelName: string): T;
|
||||
}
|
||||
|
||||
export interface Client<TContext> {
|
||||
readonly ctx: TContext;
|
||||
}
|
||||
|
||||
export interface IConnectionHub<TContext> {
|
||||
readonly connections: Connection<TContext>[];
|
||||
readonly onDidChangeConnections: Event<Connection<TContext>>;
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IClientRouter` is responsible for routing calls to specific
|
||||
* channels, in scenarios in which there are multiple possible
|
||||
* channels (each from a separate client) to pick from.
|
||||
*/
|
||||
export interface IClientRouter<TContext = string> {
|
||||
routeCall(hub: IConnectionHub<TContext>, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<Client<TContext>>;
|
||||
routeEvent(hub: IConnectionHub<TContext>, event: string, arg?: any): Promise<Client<TContext>>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar to the `IChannelClient`, you can get channels from this
|
||||
* collection of channels. The difference being that in the
|
||||
* `IRoutingChannelClient`, there are multiple clients providing
|
||||
* the same channel. You'll need to pass in an `IClientRouter` in
|
||||
* order to pick the right one.
|
||||
*/
|
||||
export interface IRoutingChannelClient<TContext = string> {
|
||||
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T;
|
||||
}
|
||||
|
||||
interface IReader {
|
||||
read(bytes: number): VSBuffer;
|
||||
}
|
||||
|
||||
interface IWriter {
|
||||
write(buffer: VSBuffer): void;
|
||||
}
|
||||
|
||||
class BufferReader implements IReader {
|
||||
|
||||
private pos = 0;
|
||||
|
||||
constructor(private buffer: VSBuffer) { }
|
||||
|
||||
read(bytes: number): VSBuffer {
|
||||
const result = this.buffer.slice(this.pos, this.pos + bytes);
|
||||
this.pos += result.byteLength;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
class BufferWriter implements IWriter {
|
||||
|
||||
private buffers: VSBuffer[] = [];
|
||||
|
||||
get buffer(): VSBuffer {
|
||||
return VSBuffer.concat(this.buffers);
|
||||
}
|
||||
|
||||
write(buffer: VSBuffer): void {
|
||||
this.buffers.push(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
enum DataType {
|
||||
Undefined = 0,
|
||||
String = 1,
|
||||
Buffer = 2,
|
||||
VSBuffer = 3,
|
||||
Array = 4,
|
||||
Object = 5
|
||||
}
|
||||
|
||||
function createSizeBuffer(size: number): VSBuffer {
|
||||
const result = VSBuffer.alloc(4);
|
||||
result.writeUint32BE(size, 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
function readSizeBuffer(reader: IReader): number {
|
||||
return reader.read(4).readUint32BE(0);
|
||||
}
|
||||
|
||||
function createOneByteBuffer(value: number): VSBuffer {
|
||||
const result = VSBuffer.alloc(1);
|
||||
result.writeUint8(value, 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
const BufferPresets = {
|
||||
Undefined: createOneByteBuffer(DataType.Undefined),
|
||||
String: createOneByteBuffer(DataType.String),
|
||||
Buffer: createOneByteBuffer(DataType.Buffer),
|
||||
VSBuffer: createOneByteBuffer(DataType.VSBuffer),
|
||||
Array: createOneByteBuffer(DataType.Array),
|
||||
Object: createOneByteBuffer(DataType.Object),
|
||||
};
|
||||
|
||||
declare var Buffer: any;
|
||||
const hasBuffer = (typeof Buffer !== 'undefined');
|
||||
|
||||
function serialize(writer: IWriter, data: any): void {
|
||||
if (typeof data === 'undefined') {
|
||||
writer.write(BufferPresets.Undefined);
|
||||
} else if (typeof data === 'string') {
|
||||
const buffer = VSBuffer.fromString(data);
|
||||
writer.write(BufferPresets.String);
|
||||
writer.write(createSizeBuffer(buffer.byteLength));
|
||||
writer.write(buffer);
|
||||
} else if (hasBuffer && Buffer.isBuffer(data)) {
|
||||
const buffer = VSBuffer.wrap(data);
|
||||
writer.write(BufferPresets.Buffer);
|
||||
writer.write(createSizeBuffer(buffer.byteLength));
|
||||
writer.write(buffer);
|
||||
} else if (data instanceof VSBuffer) {
|
||||
writer.write(BufferPresets.VSBuffer);
|
||||
writer.write(createSizeBuffer(data.byteLength));
|
||||
writer.write(data);
|
||||
} else if (Array.isArray(data)) {
|
||||
writer.write(BufferPresets.Array);
|
||||
writer.write(createSizeBuffer(data.length));
|
||||
|
||||
for (const el of data) {
|
||||
serialize(writer, el);
|
||||
}
|
||||
} else {
|
||||
const buffer = VSBuffer.fromString(JSON.stringify(data));
|
||||
writer.write(BufferPresets.Object);
|
||||
writer.write(createSizeBuffer(buffer.byteLength));
|
||||
writer.write(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
function deserialize(reader: IReader): any {
|
||||
const type = reader.read(1).readUint8(0);
|
||||
|
||||
switch (type) {
|
||||
case DataType.Undefined: return undefined;
|
||||
case DataType.String: return reader.read(readSizeBuffer(reader)).toString();
|
||||
case DataType.Buffer: return reader.read(readSizeBuffer(reader)).buffer;
|
||||
case DataType.VSBuffer: return reader.read(readSizeBuffer(reader));
|
||||
case DataType.Array: {
|
||||
const length = readSizeBuffer(reader);
|
||||
const result: any[] = [];
|
||||
|
||||
for (let i = 0; i < length; i++) {
|
||||
result.push(deserialize(reader));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
case DataType.Object: return JSON.parse(reader.read(readSizeBuffer(reader)).toString());
|
||||
}
|
||||
}
|
||||
|
||||
export class ChannelServer<TContext = string> implements IChannelServer<TContext>, IDisposable {
|
||||
|
||||
private channels = new Map<string, IServerChannel<TContext>>();
|
||||
private activeRequests = new Map<number, IDisposable>();
|
||||
private protocolListener: IDisposable | null;
|
||||
|
||||
constructor(private protocol: IMessagePassingProtocol, private ctx: TContext) {
|
||||
this.protocolListener = this.protocol.onMessage(msg => this.onRawMessage(msg));
|
||||
this.sendResponse({ type: ResponseType.Initialize });
|
||||
}
|
||||
|
||||
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
|
||||
this.channels.set(channelName, channel);
|
||||
}
|
||||
|
||||
private sendResponse(response: IRawResponse): void {
|
||||
switch (response.type) {
|
||||
case ResponseType.Initialize:
|
||||
return this.send([response.type]);
|
||||
|
||||
case ResponseType.PromiseSuccess:
|
||||
case ResponseType.PromiseError:
|
||||
case ResponseType.EventFire:
|
||||
case ResponseType.PromiseErrorObj:
|
||||
return this.send([response.type, response.id], response.data);
|
||||
}
|
||||
}
|
||||
|
||||
private send(header: any, body: any = undefined): void {
|
||||
const writer = new BufferWriter();
|
||||
serialize(writer, header);
|
||||
serialize(writer, body);
|
||||
this.sendBuffer(writer.buffer);
|
||||
}
|
||||
|
||||
private sendBuffer(message: VSBuffer): void {
|
||||
try {
|
||||
this.protocol.send(message);
|
||||
} catch (err) {
|
||||
// noop
|
||||
}
|
||||
}
|
||||
|
||||
private onRawMessage(message: VSBuffer): void {
|
||||
const reader = new BufferReader(message);
|
||||
const header = deserialize(reader);
|
||||
const body = deserialize(reader);
|
||||
const type = header[0] as RequestType;
|
||||
|
||||
switch (type) {
|
||||
case RequestType.Promise:
|
||||
return this.onPromise({ type, id: header[1], channelName: header[2], name: header[3], arg: body });
|
||||
case RequestType.EventListen:
|
||||
return this.onEventListen({ type, id: header[1], channelName: header[2], name: header[3], arg: body });
|
||||
case RequestType.PromiseCancel:
|
||||
return this.disposeActiveRequest({ type, id: header[1] });
|
||||
case RequestType.EventDispose:
|
||||
return this.disposeActiveRequest({ type, id: header[1] });
|
||||
}
|
||||
}
|
||||
|
||||
private onPromise(request: IRawPromiseRequest): void {
|
||||
const channel = this.channels.get(request.channelName);
|
||||
if (!channel) {
|
||||
throw new Error('Unknown channel');
|
||||
}
|
||||
const cancellationTokenSource = new CancellationTokenSource();
|
||||
let promise: Promise<any>;
|
||||
|
||||
try {
|
||||
promise = channel.call(this.ctx, request.name, request.arg, cancellationTokenSource.token);
|
||||
} catch (err) {
|
||||
promise = Promise.reject(err);
|
||||
}
|
||||
|
||||
const id = request.id;
|
||||
|
||||
promise.then(data => {
|
||||
this.sendResponse(<IRawResponse>{ id, data, type: ResponseType.PromiseSuccess });
|
||||
this.activeRequests.delete(request.id);
|
||||
}, err => {
|
||||
if (err instanceof Error) {
|
||||
this.sendResponse(<IRawResponse>{
|
||||
id, data: {
|
||||
message: err.message,
|
||||
name: err.name,
|
||||
stack: err.stack ? (err.stack.split ? err.stack.split('\n') : err.stack) : undefined
|
||||
}, type: ResponseType.PromiseError
|
||||
});
|
||||
} else {
|
||||
this.sendResponse(<IRawResponse>{ id, data: err, type: ResponseType.PromiseErrorObj });
|
||||
}
|
||||
|
||||
this.activeRequests.delete(request.id);
|
||||
});
|
||||
|
||||
const disposable = toDisposable(() => cancellationTokenSource.cancel());
|
||||
this.activeRequests.set(request.id, disposable);
|
||||
}
|
||||
|
||||
private onEventListen(request: IRawEventListenRequest): void {
|
||||
const channel = this.channels.get(request.channelName);
|
||||
if (!channel) {
|
||||
throw new Error('Unknown channel');
|
||||
}
|
||||
|
||||
const id = request.id;
|
||||
const event = channel.listen(this.ctx, request.name, request.arg);
|
||||
const disposable = event(data => this.sendResponse(<IRawResponse>{ id, data, type: ResponseType.EventFire }));
|
||||
|
||||
this.activeRequests.set(request.id, disposable);
|
||||
}
|
||||
|
||||
private disposeActiveRequest(request: IRawRequest): void {
|
||||
const disposable = this.activeRequests.get(request.id);
|
||||
|
||||
if (disposable) {
|
||||
disposable.dispose();
|
||||
this.activeRequests.delete(request.id);
|
||||
}
|
||||
}
|
||||
|
||||
public dispose(): void {
|
||||
if (this.protocolListener) {
|
||||
this.protocolListener.dispose();
|
||||
this.protocolListener = null;
|
||||
}
|
||||
this.activeRequests.forEach(d => d.dispose());
|
||||
this.activeRequests.clear();
|
||||
}
|
||||
}
|
||||
|
||||
export class ChannelClient implements IChannelClient, IDisposable {
|
||||
|
||||
private state: State = State.Uninitialized;
|
||||
private activeRequests = new Set<IDisposable>();
|
||||
private handlers = new Map<number, IHandler>();
|
||||
private lastRequestId: number = 0;
|
||||
private protocolListener: IDisposable | null;
|
||||
|
||||
private _onDidInitialize = new Emitter<void>();
|
||||
readonly onDidInitialize = this._onDidInitialize.event;
|
||||
|
||||
constructor(private protocol: IMessagePassingProtocol) {
|
||||
this.protocolListener = this.protocol.onMessage(msg => this.onBuffer(msg));
|
||||
}
|
||||
|
||||
getChannel<T extends IChannel>(channelName: string): T {
|
||||
const that = this;
|
||||
|
||||
return {
|
||||
call(command: string, arg?: any, cancellationToken?: CancellationToken) {
|
||||
return that.requestPromise(channelName, command, arg, cancellationToken);
|
||||
},
|
||||
listen(event: string, arg: any) {
|
||||
return that.requestEvent(channelName, event, arg);
|
||||
}
|
||||
} as T;
|
||||
}
|
||||
|
||||
private requestPromise(channelName: string, name: string, arg?: any, cancellationToken = CancellationToken.None): Promise<any> {
|
||||
const id = this.lastRequestId++;
|
||||
const type = RequestType.Promise;
|
||||
const request: IRawRequest = { id, type, channelName, name, arg };
|
||||
|
||||
if (cancellationToken.isCancellationRequested) {
|
||||
return Promise.reject(errors.canceled());
|
||||
}
|
||||
|
||||
let disposable: IDisposable;
|
||||
|
||||
const result = new Promise((c, e) => {
|
||||
if (cancellationToken.isCancellationRequested) {
|
||||
return e(errors.canceled());
|
||||
}
|
||||
|
||||
let uninitializedPromise: CancelablePromise<void> | null = createCancelablePromise(_ => this.whenInitialized());
|
||||
uninitializedPromise.then(() => {
|
||||
uninitializedPromise = null;
|
||||
|
||||
const handler: IHandler = response => {
|
||||
switch (response.type) {
|
||||
case ResponseType.PromiseSuccess:
|
||||
this.handlers.delete(id);
|
||||
c(response.data);
|
||||
break;
|
||||
|
||||
case ResponseType.PromiseError:
|
||||
this.handlers.delete(id);
|
||||
const error = new Error(response.data.message);
|
||||
(<any>error).stack = response.data.stack;
|
||||
error.name = response.data.name;
|
||||
e(error);
|
||||
break;
|
||||
|
||||
case ResponseType.PromiseErrorObj:
|
||||
this.handlers.delete(id);
|
||||
e(response.data);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
this.handlers.set(id, handler);
|
||||
this.sendRequest(request);
|
||||
});
|
||||
|
||||
const cancel = () => {
|
||||
if (uninitializedPromise) {
|
||||
uninitializedPromise.cancel();
|
||||
uninitializedPromise = null;
|
||||
} else {
|
||||
this.sendRequest({ id, type: RequestType.PromiseCancel });
|
||||
}
|
||||
|
||||
e(errors.canceled());
|
||||
};
|
||||
|
||||
const cancellationTokenListener = cancellationToken.onCancellationRequested(cancel);
|
||||
disposable = combinedDisposable([toDisposable(cancel), cancellationTokenListener]);
|
||||
this.activeRequests.add(disposable);
|
||||
});
|
||||
|
||||
return result.finally(() => this.activeRequests.delete(disposable));
|
||||
}
|
||||
|
||||
private requestEvent(channelName: string, name: string, arg?: any): Event<any> {
|
||||
const id = this.lastRequestId++;
|
||||
const type = RequestType.EventListen;
|
||||
const request: IRawRequest = { id, type, channelName, name, arg };
|
||||
|
||||
let uninitializedPromise: CancelablePromise<void> | null = null;
|
||||
|
||||
const emitter = new Emitter<any>({
|
||||
onFirstListenerAdd: () => {
|
||||
uninitializedPromise = createCancelablePromise(_ => this.whenInitialized());
|
||||
uninitializedPromise.then(() => {
|
||||
uninitializedPromise = null;
|
||||
this.activeRequests.add(emitter);
|
||||
this.sendRequest(request);
|
||||
});
|
||||
},
|
||||
onLastListenerRemove: () => {
|
||||
if (uninitializedPromise) {
|
||||
uninitializedPromise.cancel();
|
||||
uninitializedPromise = null;
|
||||
} else {
|
||||
this.activeRequests.delete(emitter);
|
||||
this.sendRequest({ id, type: RequestType.EventDispose });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const handler: IHandler = (res: IRawEventFireResponse) => emitter.fire(res.data);
|
||||
this.handlers.set(id, handler);
|
||||
|
||||
return emitter.event;
|
||||
}
|
||||
|
||||
private sendRequest(request: IRawRequest): void {
|
||||
switch (request.type) {
|
||||
case RequestType.Promise:
|
||||
case RequestType.EventListen:
|
||||
return this.send([request.type, request.id, request.channelName, request.name], request.arg);
|
||||
|
||||
case RequestType.PromiseCancel:
|
||||
case RequestType.EventDispose:
|
||||
return this.send([request.type, request.id]);
|
||||
}
|
||||
}
|
||||
|
||||
private send(header: any, body: any = undefined): void {
|
||||
const writer = new BufferWriter();
|
||||
serialize(writer, header);
|
||||
serialize(writer, body);
|
||||
this.sendBuffer(writer.buffer);
|
||||
}
|
||||
|
||||
private sendBuffer(message: VSBuffer): void {
|
||||
try {
|
||||
this.protocol.send(message);
|
||||
} catch (err) {
|
||||
// noop
|
||||
}
|
||||
}
|
||||
|
||||
private onBuffer(message: VSBuffer): void {
|
||||
const reader = new BufferReader(message);
|
||||
const header = deserialize(reader);
|
||||
const body = deserialize(reader);
|
||||
const type: ResponseType = header[0];
|
||||
|
||||
switch (type) {
|
||||
case ResponseType.Initialize:
|
||||
return this.onResponse({ type: header[0] });
|
||||
|
||||
case ResponseType.PromiseSuccess:
|
||||
case ResponseType.PromiseError:
|
||||
case ResponseType.EventFire:
|
||||
case ResponseType.PromiseErrorObj:
|
||||
return this.onResponse({ type: header[0], id: header[1], data: body });
|
||||
}
|
||||
}
|
||||
|
||||
private onResponse(response: IRawResponse): void {
|
||||
if (response.type === ResponseType.Initialize) {
|
||||
this.state = State.Idle;
|
||||
this._onDidInitialize.fire();
|
||||
return;
|
||||
}
|
||||
|
||||
const handler = this.handlers.get(response.id);
|
||||
|
||||
if (handler) {
|
||||
handler(response);
|
||||
}
|
||||
}
|
||||
|
||||
private whenInitialized(): Promise<void> {
|
||||
if (this.state === State.Idle) {
|
||||
return Promise.resolve();
|
||||
} else {
|
||||
return Event.toPromise(this.onDidInitialize);
|
||||
}
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
if (this.protocolListener) {
|
||||
this.protocolListener.dispose();
|
||||
this.protocolListener = null;
|
||||
}
|
||||
this.activeRequests.forEach(p => p.dispose());
|
||||
this.activeRequests.clear();
|
||||
}
|
||||
}
|
||||
|
||||
export interface ClientConnectionEvent {
|
||||
protocol: IMessagePassingProtocol;
|
||||
onDidClientDisconnect: Event<void>;
|
||||
}
|
||||
|
||||
interface Connection<TContext> extends Client<TContext> {
|
||||
readonly channelClient: ChannelClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IPCServer` is both a channel server and a routing channel
|
||||
* client.
|
||||
*
|
||||
* As the owner of a protocol, you should extend both this
|
||||
* and the `IPCClient` classes to get IPC implementations
|
||||
* for your protocol.
|
||||
*/
|
||||
export class IPCServer<TContext = string> implements IChannelServer<TContext>, IRoutingChannelClient<TContext>, IConnectionHub<TContext>, IDisposable {
|
||||
|
||||
private channels = new Map<string, IServerChannel<TContext>>();
|
||||
private _connections = new Set<Connection<TContext>>();
|
||||
|
||||
private _onDidChangeConnections = new Emitter<Connection<TContext>>();
|
||||
readonly onDidChangeConnections: Event<Connection<TContext>> = this._onDidChangeConnections.event;
|
||||
|
||||
get connections(): Connection<TContext>[] {
|
||||
const result: Connection<TContext>[] = [];
|
||||
this._connections.forEach(ctx => result.push(ctx));
|
||||
return result;
|
||||
}
|
||||
|
||||
constructor(onDidClientConnect: Event<ClientConnectionEvent>) {
|
||||
onDidClientConnect(({ protocol, onDidClientDisconnect }) => {
|
||||
const onFirstMessage = Event.once(protocol.onMessage);
|
||||
|
||||
onFirstMessage(msg => {
|
||||
const reader = new BufferReader(msg);
|
||||
const ctx = deserialize(reader) as TContext;
|
||||
|
||||
const channelServer = new ChannelServer(protocol, ctx);
|
||||
const channelClient = new ChannelClient(protocol);
|
||||
|
||||
this.channels.forEach((channel, name) => channelServer.registerChannel(name, channel));
|
||||
|
||||
const connection: Connection<TContext> = { channelClient, ctx };
|
||||
this._connections.add(connection);
|
||||
this._onDidChangeConnections.fire(connection);
|
||||
|
||||
onDidClientDisconnect(() => {
|
||||
channelServer.dispose();
|
||||
channelClient.dispose();
|
||||
this._connections.delete(connection);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T {
|
||||
const that = this;
|
||||
|
||||
return {
|
||||
call(command: string, arg?: any, cancellationToken?: CancellationToken) {
|
||||
const channelPromise = router.routeCall(that, command, arg)
|
||||
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
|
||||
|
||||
return getDelayedChannel(channelPromise)
|
||||
.call(command, arg, cancellationToken);
|
||||
},
|
||||
listen(event: string, arg: any) {
|
||||
const channelPromise = router.routeEvent(that, event, arg)
|
||||
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
|
||||
|
||||
return getDelayedChannel(channelPromise)
|
||||
.listen(event, arg);
|
||||
}
|
||||
} as T;
|
||||
}
|
||||
|
||||
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
|
||||
this.channels.set(channelName, channel);
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
this.channels.clear();
|
||||
this._connections.clear();
|
||||
this._onDidChangeConnections.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IPCClient` is both a channel client and a channel server.
|
||||
*
|
||||
* As the owner of a protocol, you should extend both this
|
||||
* and the `IPCClient` classes to get IPC implementations
|
||||
* for your protocol.
|
||||
*/
|
||||
export class IPCClient<TContext = string> implements IChannelClient, IChannelServer<TContext>, IDisposable {
|
||||
|
||||
private channelClient: ChannelClient;
|
||||
private channelServer: ChannelServer<TContext>;
|
||||
|
||||
constructor(protocol: IMessagePassingProtocol, ctx: TContext) {
|
||||
const writer = new BufferWriter();
|
||||
serialize(writer, ctx);
|
||||
protocol.send(writer.buffer);
|
||||
|
||||
this.channelClient = new ChannelClient(protocol);
|
||||
this.channelServer = new ChannelServer(protocol, ctx);
|
||||
}
|
||||
|
||||
getChannel<T extends IChannel>(channelName: string): T {
|
||||
return this.channelClient.getChannel(channelName) as T;
|
||||
}
|
||||
|
||||
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
|
||||
this.channelServer.registerChannel(channelName, channel);
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
this.channelClient.dispose();
|
||||
this.channelServer.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
export function getDelayedChannel<T extends IChannel>(promise: Promise<T>): T {
|
||||
return {
|
||||
call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
|
||||
return promise.then(c => c.call<T>(command, arg, cancellationToken));
|
||||
},
|
||||
|
||||
listen<T>(event: string, arg?: any): Event<T> {
|
||||
const relay = new Relay<any>();
|
||||
promise.then(c => relay.input = c.listen(event, arg));
|
||||
return relay.event;
|
||||
}
|
||||
} as T;
|
||||
}
|
||||
|
||||
export function getNextTickChannel<T extends IChannel>(channel: T): T {
|
||||
let didTick = false;
|
||||
|
||||
return {
|
||||
call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
|
||||
if (didTick) {
|
||||
return channel.call(command, arg, cancellationToken);
|
||||
}
|
||||
|
||||
return timeout(0)
|
||||
.then(() => didTick = true)
|
||||
.then(() => channel.call<T>(command, arg, cancellationToken));
|
||||
},
|
||||
listen<T>(event: string, arg?: any): Event<T> {
|
||||
if (didTick) {
|
||||
return channel.listen<T>(event, arg);
|
||||
}
|
||||
|
||||
const relay = new Relay<T>();
|
||||
|
||||
timeout(0)
|
||||
.then(() => didTick = true)
|
||||
.then(() => relay.input = channel.listen<T>(event, arg));
|
||||
|
||||
return relay.event;
|
||||
}
|
||||
} as T;
|
||||
}
|
||||
|
||||
export class StaticRouter<TContext = string> implements IClientRouter<TContext> {
|
||||
|
||||
constructor(private fn: (ctx: TContext) => boolean | Promise<boolean>) { }
|
||||
|
||||
routeCall(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
|
||||
return this.route(hub);
|
||||
}
|
||||
|
||||
routeEvent(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
|
||||
return this.route(hub);
|
||||
}
|
||||
|
||||
private async route(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
|
||||
for (const connection of hub.connections) {
|
||||
if (await Promise.resolve(this.fn(connection.ctx))) {
|
||||
return Promise.resolve(connection);
|
||||
}
|
||||
}
|
||||
|
||||
await Event.toPromise(hub.onDidChangeConnections);
|
||||
return await this.route(hub);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,17 +4,18 @@
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import { Event } from 'vs/base/common/event';
|
||||
import { IPCClient } from 'vs/base/parts/ipc/node/ipc';
|
||||
import { IPCClient } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { Protocol } from 'vs/base/parts/ipc/node/ipc.electron';
|
||||
import { ipcRenderer } from 'electron';
|
||||
import { IDisposable } from 'vs/base/common/lifecycle';
|
||||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
|
||||
export class Client extends IPCClient implements IDisposable {
|
||||
|
||||
private protocol: Protocol;
|
||||
|
||||
private static createProtocol(): Protocol {
|
||||
const onMessage = Event.fromNodeEventEmitter<Buffer>(ipcRenderer, 'ipc:message', (_, message: Buffer) => message);
|
||||
const onMessage = Event.fromNodeEventEmitter<VSBuffer>(ipcRenderer, 'ipc:message', (_, message: Buffer) => VSBuffer.wrap(message));
|
||||
ipcRenderer.send('ipc:hello');
|
||||
return new Protocol(ipcRenderer, onMessage);
|
||||
}
|
||||
|
||||
@@ -4,20 +4,22 @@
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import { Event, Emitter } from 'vs/base/common/event';
|
||||
import { IPCServer, ClientConnectionEvent } from 'vs/base/parts/ipc/node/ipc';
|
||||
import { IPCServer, ClientConnectionEvent } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { Protocol } from 'vs/base/parts/ipc/node/ipc.electron';
|
||||
import { ipcMain } from 'electron';
|
||||
import { IDisposable, toDisposable } from 'vs/base/common/lifecycle';
|
||||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
|
||||
interface IIPCEvent {
|
||||
event: { sender: Electron.WebContents; };
|
||||
message: Buffer | null;
|
||||
}
|
||||
|
||||
function createScopedOnMessageEvent(senderId: number, eventName: string): Event<Buffer | null> {
|
||||
function createScopedOnMessageEvent(senderId: number, eventName: string): Event<VSBuffer | null> {
|
||||
const onMessage = Event.fromNodeEventEmitter<IIPCEvent>(ipcMain, eventName, (event, message) => ({ event, message }));
|
||||
const onMessageFromSender = Event.filter(onMessage, ({ event }) => event.sender.id === senderId);
|
||||
return Event.map(onMessageFromSender, ({ message }) => message);
|
||||
// {{SQL CARBON EDIT}} cast message as null since typescript isn't saying its always null
|
||||
return Event.map(onMessageFromSender, ({ message }) => message ? VSBuffer.wrap(message) : message as null);
|
||||
}
|
||||
|
||||
export class Server extends IPCServer {
|
||||
@@ -38,7 +40,7 @@ export class Server extends IPCServer {
|
||||
const onDidClientReconnect = new Emitter<void>();
|
||||
Server.Clients.set(id, toDisposable(() => onDidClientReconnect.fire()));
|
||||
|
||||
const onMessage = createScopedOnMessageEvent(id, 'ipc:message') as Event<Buffer>;
|
||||
const onMessage = createScopedOnMessageEvent(id, 'ipc:message') as Event<VSBuffer>;
|
||||
const onDidClientDisconnect = Event.any(Event.signal(createScopedOnMessageEvent(id, 'ipc:disconnect')), onDidClientReconnect.event);
|
||||
const protocol = new Protocol(webContents, onMessage);
|
||||
|
||||
@@ -49,4 +51,4 @@ export class Server extends IPCServer {
|
||||
constructor() {
|
||||
super(Server.getOnDidClientConnect());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,11 +9,11 @@ import { Delayer, createCancelablePromise } from 'vs/base/common/async';
|
||||
import { deepClone, assign } from 'vs/base/common/objects';
|
||||
import { Emitter, Event } from 'vs/base/common/event';
|
||||
import { createQueuedSender } from 'vs/base/node/processes';
|
||||
import { ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelClient } from 'vs/base/parts/ipc/node/ipc';
|
||||
import { IChannel, ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelClient } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { isRemoteConsoleLog, log } from 'vs/base/common/console';
|
||||
import { CancellationToken } from 'vs/base/common/cancellation';
|
||||
import * as errors from 'vs/base/common/errors';
|
||||
import { IChannel } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
|
||||
/**
|
||||
* This implementation doesn't perform well since it uses base64 encoding for buffers.
|
||||
@@ -26,11 +26,11 @@ export class Server<TContext extends string> extends IPCServer<TContext> {
|
||||
send: r => {
|
||||
try {
|
||||
if (process.send) {
|
||||
process.send(r.toString('base64'));
|
||||
process.send((<Buffer>r.buffer).toString('base64'));
|
||||
}
|
||||
} catch (e) { /* not much to do */ }
|
||||
},
|
||||
onMessage: Event.fromNodeEventEmitter(process, 'message', msg => Buffer.from(msg, 'base64'))
|
||||
onMessage: Event.fromNodeEventEmitter(process, 'message', msg => VSBuffer.wrap(Buffer.from(msg, 'base64')))
|
||||
}, ctx);
|
||||
|
||||
process.once('disconnect', () => this.dispose());
|
||||
@@ -199,7 +199,7 @@ export class Client implements IChannelClient, IDisposable {
|
||||
|
||||
this.child = fork(this.modulePath, args, forkOpts);
|
||||
|
||||
const onMessageEmitter = new Emitter<Buffer>();
|
||||
const onMessageEmitter = new Emitter<VSBuffer>();
|
||||
const onRawMessage = Event.fromNodeEventEmitter(this.child, 'message', msg => msg);
|
||||
|
||||
onRawMessage(msg => {
|
||||
@@ -211,11 +211,11 @@ export class Client implements IChannelClient, IDisposable {
|
||||
}
|
||||
|
||||
// Anything else goes to the outside
|
||||
onMessageEmitter.fire(Buffer.from(msg, 'base64'));
|
||||
onMessageEmitter.fire(VSBuffer.wrap(Buffer.from(msg, 'base64')));
|
||||
});
|
||||
|
||||
const sender = this.options.useQueue ? createQueuedSender(this.child) : this.child;
|
||||
const send = (r: Buffer) => this.child && this.child.connected && sender.send(r.toString('base64'));
|
||||
const send = (r: VSBuffer) => this.child && this.child.connected && sender.send((<Buffer>r.buffer).toString('base64'));
|
||||
const onMessage = onMessageEmitter.event;
|
||||
const protocol = { send, onMessage };
|
||||
|
||||
|
||||
@@ -3,8 +3,9 @@
|
||||
* Licensed under the Source EULA. See License.txt in the project root for license information.
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
|
||||
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { Event } from 'vs/base/common/event';
|
||||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
|
||||
export interface Sender {
|
||||
send(channel: string, msg: Buffer | null): void;
|
||||
@@ -12,11 +13,11 @@ export interface Sender {
|
||||
|
||||
export class Protocol implements IMessagePassingProtocol {
|
||||
|
||||
constructor(private sender: Sender, readonly onMessage: Event<Buffer>) { }
|
||||
constructor(private sender: Sender, readonly onMessage: Event<VSBuffer>) { }
|
||||
|
||||
send(message: Buffer): void {
|
||||
send(message: VSBuffer): void {
|
||||
try {
|
||||
this.sender.send('ipc:message', message);
|
||||
this.sender.send('ipc:message', (<Buffer>message.buffer));
|
||||
} catch (e) {
|
||||
// systems are going down
|
||||
}
|
||||
|
||||
@@ -4,13 +4,62 @@
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import { Socket, Server as NetServer, createConnection, createServer } from 'net';
|
||||
import { Event, Emitter } from 'vs/base/common/event';
|
||||
import { IMessagePassingProtocol, ClientConnectionEvent, IPCServer, IPCClient } from 'vs/base/parts/ipc/node/ipc';
|
||||
import { Event } from 'vs/base/common/event';
|
||||
import { ClientConnectionEvent, IPCServer } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { join } from 'vs/base/common/path';
|
||||
import { tmpdir } from 'os';
|
||||
import * as fs from 'fs';
|
||||
import { generateUuid } from 'vs/base/common/uuid';
|
||||
import { IDisposable } from 'vs/base/common/lifecycle';
|
||||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
import { ISocket, Protocol, Client } from 'vs/base/parts/ipc/common/ipc.net';
|
||||
|
||||
export class NodeSocket implements ISocket {
|
||||
public readonly socket: Socket;
|
||||
|
||||
constructor(socket: Socket) {
|
||||
this.socket = socket;
|
||||
}
|
||||
|
||||
public onData(_listener: (e: VSBuffer) => void): IDisposable {
|
||||
const listener = (buff: Buffer) => _listener(VSBuffer.wrap(buff));
|
||||
this.socket.on('data', listener);
|
||||
return {
|
||||
dispose: () => this.socket.off('data', listener)
|
||||
};
|
||||
}
|
||||
|
||||
public onClose(listener: () => void): IDisposable {
|
||||
this.socket.on('close', listener);
|
||||
return {
|
||||
dispose: () => this.socket.off('close', listener)
|
||||
};
|
||||
}
|
||||
|
||||
public onEnd(listener: () => void): IDisposable {
|
||||
this.socket.on('end', listener);
|
||||
return {
|
||||
dispose: () => this.socket.off('end', listener)
|
||||
};
|
||||
}
|
||||
|
||||
public write(buffer: VSBuffer): void {
|
||||
// return early if socket has been destroyed in the meantime
|
||||
if (this.socket.destroyed) {
|
||||
return;
|
||||
}
|
||||
|
||||
// we ignore the returned value from `write` because we would have to cached the data
|
||||
// anyways and nodejs is already doing that for us:
|
||||
// > https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback
|
||||
// > However, the false return value is only advisory and the writable stream will unconditionally
|
||||
// > accept and buffer chunk even if it has not not been allowed to drain.
|
||||
this.socket.write(<Buffer>buffer.buffer);
|
||||
}
|
||||
|
||||
public end(): void {
|
||||
this.socket.end();
|
||||
}
|
||||
}
|
||||
|
||||
export function generateRandomPipeName(): string {
|
||||
const randomSuffix = generateUuid();
|
||||
@@ -22,386 +71,13 @@ export function generateRandomPipeName(): string {
|
||||
}
|
||||
}
|
||||
|
||||
function log(fd: number, msg: string, data?: Buffer): void {
|
||||
const date = new Date();
|
||||
fs.writeSync(fd, `[${date.getHours()}:${date.getMinutes()}:${date.getSeconds()}.${date.getMilliseconds()}] ${msg}\n`);
|
||||
if (data) {
|
||||
fs.writeSync(fd, data);
|
||||
fs.writeSync(fd, `\n---------------------------------------------------------------------------------------------------------\n`);
|
||||
}
|
||||
fs.fdatasyncSync(fd);
|
||||
}
|
||||
|
||||
const EMPTY_BUFFER = Buffer.allocUnsafe(0);
|
||||
|
||||
class ChunkStream {
|
||||
|
||||
private _chunks: Buffer[];
|
||||
private _totalLength: number;
|
||||
|
||||
public get byteLength() {
|
||||
return this._totalLength;
|
||||
}
|
||||
|
||||
constructor() {
|
||||
this._chunks = [];
|
||||
this._totalLength = 0;
|
||||
}
|
||||
|
||||
public acceptChunk(buff: Buffer) {
|
||||
this._chunks.push(buff);
|
||||
this._totalLength += buff.byteLength;
|
||||
}
|
||||
|
||||
public read(byteCount: number): Buffer {
|
||||
if (byteCount === 0) {
|
||||
return EMPTY_BUFFER;
|
||||
}
|
||||
|
||||
if (byteCount > this._totalLength) {
|
||||
throw new Error(`Cannot read so many bytes!`);
|
||||
}
|
||||
|
||||
if (this._chunks[0].byteLength === byteCount) {
|
||||
// super fast path, precisely first chunk must be returned
|
||||
const result = this._chunks.shift()!;
|
||||
this._totalLength -= byteCount;
|
||||
return result;
|
||||
}
|
||||
|
||||
if (this._chunks[0].byteLength > byteCount) {
|
||||
// fast path, the reading is entirely within the first chunk
|
||||
const result = this._chunks[0].slice(0, byteCount);
|
||||
this._chunks[0] = this._chunks[0].slice(byteCount);
|
||||
this._totalLength -= byteCount;
|
||||
return result;
|
||||
}
|
||||
|
||||
let result = Buffer.allocUnsafe(byteCount);
|
||||
let resultOffset = 0;
|
||||
while (byteCount > 0) {
|
||||
const chunk = this._chunks[0];
|
||||
if (chunk.byteLength > byteCount) {
|
||||
// this chunk will survive
|
||||
this._chunks[0] = chunk.slice(byteCount);
|
||||
|
||||
chunk.copy(result, resultOffset, 0, byteCount);
|
||||
resultOffset += byteCount;
|
||||
this._totalLength -= byteCount;
|
||||
byteCount -= byteCount;
|
||||
} else {
|
||||
// this chunk will be entirely read
|
||||
this._chunks.shift();
|
||||
|
||||
chunk.copy(result, resultOffset, 0, chunk.byteLength);
|
||||
resultOffset += chunk.byteLength;
|
||||
this._totalLength -= chunk.byteLength;
|
||||
byteCount -= chunk.byteLength;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
const enum ProtocolMessageType {
|
||||
None = 0,
|
||||
Regular = 1,
|
||||
Control = 2,
|
||||
Ack = 3,
|
||||
KeepAlive = 4
|
||||
}
|
||||
|
||||
function ProtocolMessageTypeToString(type: ProtocolMessageType): string {
|
||||
switch (type) {
|
||||
case ProtocolMessageType.None: return 'None';
|
||||
case ProtocolMessageType.Regular: return 'Regular';
|
||||
case ProtocolMessageType.Control: return 'Control';
|
||||
case ProtocolMessageType.Ack: return 'Ack';
|
||||
case ProtocolMessageType.KeepAlive: return 'KeepAlive';
|
||||
}
|
||||
}
|
||||
|
||||
export const enum ProtocolConstants {
|
||||
HeaderLength = 13,
|
||||
/**
|
||||
* Send an Acknowledge message at most 2 seconds later...
|
||||
*/
|
||||
AcknowledgeTime = 2000, // 2 seconds
|
||||
/**
|
||||
* If there is a message that has been unacknowledged for 10 seconds, consider the connection closed...
|
||||
*/
|
||||
AcknowledgeTimeoutTime = 10000, // 10 seconds
|
||||
/**
|
||||
* Send at least a message every 30s for keep alive reasons.
|
||||
*/
|
||||
KeepAliveTime = 30000, // 30 seconds
|
||||
/**
|
||||
* If there is no message received for 60 seconds, consider the connection closed...
|
||||
*/
|
||||
KeepAliveTimeoutTime = 60000, // 60 seconds
|
||||
/**
|
||||
* If there is no reconnection within this time-frame, consider the connection permanently closed...
|
||||
*/
|
||||
ReconnectionGraceTime = 60 * 60 * 1000, // 1hr
|
||||
}
|
||||
|
||||
class ProtocolMessage {
|
||||
|
||||
public writtenTime: number;
|
||||
|
||||
constructor(
|
||||
public readonly type: ProtocolMessageType,
|
||||
public readonly id: number,
|
||||
public readonly ack: number,
|
||||
public readonly data: Buffer
|
||||
) {
|
||||
this.writtenTime = 0;
|
||||
}
|
||||
|
||||
public get size(): number {
|
||||
return this.data.byteLength;
|
||||
}
|
||||
}
|
||||
|
||||
class ProtocolReader {
|
||||
|
||||
private readonly _socket: Socket;
|
||||
private _isDisposed: boolean;
|
||||
private readonly _incomingData: ChunkStream;
|
||||
private readonly _socketDataListener: (data: Buffer) => void;
|
||||
public lastReadTime: number;
|
||||
|
||||
private readonly _onMessage = new Emitter<ProtocolMessage>();
|
||||
public readonly onMessage: Event<ProtocolMessage> = this._onMessage.event;
|
||||
|
||||
private readonly _state = {
|
||||
readHead: true,
|
||||
readLen: ProtocolConstants.HeaderLength,
|
||||
messageType: ProtocolMessageType.None,
|
||||
id: 0,
|
||||
ack: 0
|
||||
};
|
||||
|
||||
constructor(socket: Socket) {
|
||||
this._socket = socket;
|
||||
this._isDisposed = false;
|
||||
this._incomingData = new ChunkStream();
|
||||
this._socketDataListener = (data: Buffer) => this.acceptChunk(data);
|
||||
this._socket.on('data', this._socketDataListener);
|
||||
this.lastReadTime = Date.now();
|
||||
}
|
||||
|
||||
public acceptChunk(data: Buffer | null): void {
|
||||
if (!data || data.byteLength === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.lastReadTime = Date.now();
|
||||
|
||||
this._incomingData.acceptChunk(data);
|
||||
|
||||
while (this._incomingData.byteLength >= this._state.readLen) {
|
||||
|
||||
const buff = this._incomingData.read(this._state.readLen);
|
||||
|
||||
if (this._state.readHead) {
|
||||
// buff is the header
|
||||
|
||||
// save new state => next time will read the body
|
||||
this._state.readHead = false;
|
||||
this._state.readLen = buff.readUInt32BE(9, true);
|
||||
this._state.messageType = <ProtocolMessageType>buff.readUInt8(0, true);
|
||||
this._state.id = buff.readUInt32BE(1, true);
|
||||
this._state.ack = buff.readUInt32BE(5, true);
|
||||
} else {
|
||||
// buff is the body
|
||||
const messageType = this._state.messageType;
|
||||
const id = this._state.id;
|
||||
const ack = this._state.ack;
|
||||
|
||||
// save new state => next time will read the header
|
||||
this._state.readHead = true;
|
||||
this._state.readLen = ProtocolConstants.HeaderLength;
|
||||
this._state.messageType = ProtocolMessageType.None;
|
||||
this._state.id = 0;
|
||||
this._state.ack = 0;
|
||||
|
||||
this._onMessage.fire(new ProtocolMessage(messageType, id, ack, buff));
|
||||
|
||||
if (this._isDisposed) {
|
||||
// check if an event listener lead to our disposal
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public readEntireBuffer(): Buffer {
|
||||
return this._incomingData.read(this._incomingData.byteLength);
|
||||
}
|
||||
|
||||
public dispose(): void {
|
||||
this._isDisposed = true;
|
||||
this._socket.removeListener('data', this._socketDataListener);
|
||||
}
|
||||
}
|
||||
|
||||
class ProtocolWriter {
|
||||
|
||||
private _isDisposed: boolean;
|
||||
private readonly _socket: Socket;
|
||||
private readonly _logFile: number;
|
||||
private _data: Buffer[];
|
||||
private _totalLength;
|
||||
public lastWriteTime: number;
|
||||
|
||||
constructor(socket: Socket, logFile: number) {
|
||||
this._isDisposed = false;
|
||||
this._socket = socket;
|
||||
this._logFile = logFile;
|
||||
this._data = [];
|
||||
this._totalLength = 0;
|
||||
this.lastWriteTime = 0;
|
||||
}
|
||||
|
||||
public dispose(): void {
|
||||
this.flush();
|
||||
this._isDisposed = true;
|
||||
}
|
||||
|
||||
public flush(): void {
|
||||
// flush
|
||||
this._writeNow();
|
||||
}
|
||||
|
||||
public write(msg: ProtocolMessage) {
|
||||
if (this._isDisposed) {
|
||||
console.warn(`Cannot write message in a disposed ProtocolWriter`);
|
||||
console.warn(msg);
|
||||
return;
|
||||
}
|
||||
if (this._logFile) {
|
||||
log(this._logFile, `send-${ProtocolMessageTypeToString(msg.type)}-${msg.id}-${msg.ack}-`, msg.data);
|
||||
}
|
||||
msg.writtenTime = Date.now();
|
||||
this.lastWriteTime = Date.now();
|
||||
const header = Buffer.allocUnsafe(ProtocolConstants.HeaderLength);
|
||||
header.writeUInt8(msg.type, 0, true);
|
||||
header.writeUInt32BE(msg.id, 1, true);
|
||||
header.writeUInt32BE(msg.ack, 5, true);
|
||||
header.writeUInt32BE(msg.data.length, 9, true);
|
||||
this._writeSoon(header, msg.data);
|
||||
}
|
||||
|
||||
private _bufferAdd(head: Buffer, body: Buffer): boolean {
|
||||
const wasEmpty = this._totalLength === 0;
|
||||
this._data.push(head, body);
|
||||
this._totalLength += head.length + body.length;
|
||||
return wasEmpty;
|
||||
}
|
||||
|
||||
private _bufferTake(): Buffer {
|
||||
const ret = Buffer.concat(this._data, this._totalLength);
|
||||
this._data.length = 0;
|
||||
this._totalLength = 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
private _writeSoon(header: Buffer, data: Buffer): void {
|
||||
if (this._bufferAdd(header, data)) {
|
||||
setImmediate(() => {
|
||||
this._writeNow();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private _writeNow(): void {
|
||||
if (this._totalLength === 0) {
|
||||
return;
|
||||
}
|
||||
// return early if socket has been destroyed in the meantime
|
||||
if (this._socket.destroyed) {
|
||||
return;
|
||||
}
|
||||
// we ignore the returned value from `write` because we would have to cached the data
|
||||
// anyways and nodejs is already doing that for us:
|
||||
// > https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback
|
||||
// > However, the false return value is only advisory and the writable stream will unconditionally
|
||||
// > accept and buffer chunk even if it has not not been allowed to drain.
|
||||
this._socket.write(this._bufferTake());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has the following format:
|
||||
* ```
|
||||
* /-------------------------------|------\
|
||||
* | HEADER | |
|
||||
* |-------------------------------| DATA |
|
||||
* | TYPE | ID | ACK | DATA_LENGTH | |
|
||||
* \-------------------------------|------/
|
||||
* ```
|
||||
* The header is 9 bytes and consists of:
|
||||
* - TYPE is 1 byte (ProtocolMessageType) - the message type
|
||||
* - ID is 4 bytes (u32be) - the message id (can be 0 to indicate to be ignored)
|
||||
* - ACK is 4 bytes (u32be) - the acknowledged message id (can be 0 to indicate to be ignored)
|
||||
* - DATA_LENGTH is 4 bytes (u32be) - the length in bytes of DATA
|
||||
*
|
||||
* Only Regular messages are counted, other messages are not counted, nor acknowledged.
|
||||
*/
|
||||
export class Protocol implements IDisposable, IMessagePassingProtocol {
|
||||
|
||||
private _socket: Socket;
|
||||
private _socketWriter: ProtocolWriter;
|
||||
private _socketReader: ProtocolReader;
|
||||
|
||||
private _socketCloseListener: () => void;
|
||||
|
||||
private _onMessage = new Emitter<Buffer>();
|
||||
readonly onMessage: Event<Buffer> = this._onMessage.event;
|
||||
|
||||
private _onClose = new Emitter<void>();
|
||||
readonly onClose: Event<void> = this._onClose.event;
|
||||
|
||||
constructor(socket: Socket) {
|
||||
this._socket = socket;
|
||||
this._socketWriter = new ProtocolWriter(this._socket, 0);
|
||||
this._socketReader = new ProtocolReader(this._socket);
|
||||
|
||||
this._socketReader.onMessage((msg) => {
|
||||
if (msg.type === ProtocolMessageType.Regular) {
|
||||
this._onMessage.fire(msg.data);
|
||||
}
|
||||
});
|
||||
|
||||
this._socketCloseListener = () => {
|
||||
this._onClose.fire();
|
||||
};
|
||||
this._socket.once('close', this._socketCloseListener);
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
this._socketWriter.dispose();
|
||||
this._socketReader.dispose();
|
||||
this._socket.removeListener('close', this._socketCloseListener);
|
||||
}
|
||||
|
||||
getSocket(): Socket {
|
||||
return this._socket;
|
||||
}
|
||||
|
||||
send(buffer: Buffer): void {
|
||||
this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.Regular, 0, 0, buffer));
|
||||
}
|
||||
}
|
||||
|
||||
export class Server extends IPCServer {
|
||||
|
||||
private static toClientConnectionEvent(server: NetServer): Event<ClientConnectionEvent> {
|
||||
const onConnection = Event.fromNodeEventEmitter<Socket>(server, 'connection');
|
||||
|
||||
return Event.map(onConnection, socket => ({
|
||||
protocol: new Protocol(socket),
|
||||
protocol: new Protocol(new NodeSocket(socket)),
|
||||
onDidClientDisconnect: Event.once(Event.fromNodeEventEmitter<void>(socket, 'close'))
|
||||
}));
|
||||
}
|
||||
@@ -422,26 +98,6 @@ export class Server extends IPCServer {
|
||||
}
|
||||
}
|
||||
|
||||
export class Client<TContext = string> extends IPCClient<TContext> {
|
||||
|
||||
static fromSocket<TContext = string>(socket: Socket, id: TContext): Client<TContext> {
|
||||
return new Client(new Protocol(socket), id);
|
||||
}
|
||||
|
||||
get onClose(): Event<void> { return this.protocol.onClose; }
|
||||
|
||||
constructor(private protocol: Protocol | PersistentProtocol, id: TContext) {
|
||||
super(protocol, id);
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
super.dispose();
|
||||
const socket = this.protocol.getSocket();
|
||||
this.protocol.dispose();
|
||||
socket.end();
|
||||
}
|
||||
}
|
||||
|
||||
export function serve(port: number): Promise<Server>;
|
||||
export function serve(namedPipe: string): Promise<Server>;
|
||||
export function serve(hook: any): Promise<Server> {
|
||||
@@ -463,440 +119,9 @@ export function connect(hook: any, clientId: string): Promise<Client> {
|
||||
return new Promise<Client>((c, e) => {
|
||||
const socket = createConnection(hook, () => {
|
||||
socket.removeListener('error', e);
|
||||
c(Client.fromSocket(socket, clientId));
|
||||
c(Client.fromSocket(new NodeSocket(socket), clientId));
|
||||
});
|
||||
|
||||
socket.once('error', e);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Will ensure no messages are lost if there are no event listeners.
|
||||
*/
|
||||
function createBufferedEvent<T>(source: Event<T>): Event<T> {
|
||||
let emitter: Emitter<T>;
|
||||
let hasListeners = false;
|
||||
let isDeliveringMessages = false;
|
||||
let bufferedMessages: T[] = [];
|
||||
|
||||
const deliverMessages = () => {
|
||||
if (isDeliveringMessages) {
|
||||
return;
|
||||
}
|
||||
isDeliveringMessages = true;
|
||||
while (hasListeners && bufferedMessages.length > 0) {
|
||||
emitter.fire(bufferedMessages.shift()!);
|
||||
}
|
||||
isDeliveringMessages = false;
|
||||
};
|
||||
|
||||
source((e: T) => {
|
||||
bufferedMessages.push(e);
|
||||
deliverMessages();
|
||||
});
|
||||
|
||||
emitter = new Emitter<T>({
|
||||
onFirstListenerAdd: () => {
|
||||
hasListeners = true;
|
||||
// it is important to deliver these messages after this call, but before
|
||||
// other messages have a chance to be received (to guarantee in order delivery)
|
||||
// that's why we're using here nextTick and not other types of timeouts
|
||||
process.nextTick(deliverMessages);
|
||||
},
|
||||
onLastListenerRemove: () => {
|
||||
hasListeners = false;
|
||||
}
|
||||
});
|
||||
|
||||
return emitter.event;
|
||||
}
|
||||
|
||||
class QueueElement<T> {
|
||||
public readonly data: T;
|
||||
public next: QueueElement<T> | null;
|
||||
|
||||
constructor(data: T) {
|
||||
this.data = data;
|
||||
this.next = null;
|
||||
}
|
||||
}
|
||||
|
||||
class Queue<T> {
|
||||
|
||||
private _first: QueueElement<T> | null;
|
||||
private _last: QueueElement<T> | null;
|
||||
|
||||
constructor() {
|
||||
this._first = null;
|
||||
this._last = null;
|
||||
}
|
||||
|
||||
public peek(): T | null {
|
||||
if (!this._first) {
|
||||
return null;
|
||||
}
|
||||
return this._first.data;
|
||||
}
|
||||
|
||||
public toArray(): T[] {
|
||||
let result: T[] = [], resultLen = 0;
|
||||
let it = this._first;
|
||||
while (it) {
|
||||
result[resultLen++] = it.data;
|
||||
it = it.next;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public pop(): void {
|
||||
if (!this._first) {
|
||||
return;
|
||||
}
|
||||
if (this._first === this._last) {
|
||||
this._first = null;
|
||||
this._last = null;
|
||||
return;
|
||||
}
|
||||
this._first = this._first.next;
|
||||
}
|
||||
|
||||
public push(item: T): void {
|
||||
const element = new QueueElement(item);
|
||||
if (!this._first) {
|
||||
this._first = element;
|
||||
this._last = element;
|
||||
return;
|
||||
}
|
||||
this._last!.next = element;
|
||||
this._last = element;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as Protocol, but will actually track messages and acks.
|
||||
* Moreover, it will ensure no messages are lost if there are no event listeners.
|
||||
*/
|
||||
export class PersistentProtocol {
|
||||
|
||||
private _logFile: number;
|
||||
private _isReconnecting: boolean;
|
||||
|
||||
private _outgoingUnackMsg: Queue<ProtocolMessage>;
|
||||
private _outgoingMsgId: number;
|
||||
private _outgoingAckId: number;
|
||||
private _outgoingAckTimeout: NodeJS.Timeout | null;
|
||||
|
||||
private _incomingMsgId: number;
|
||||
private _incomingAckId: number;
|
||||
private _incomingMsgLastTime: number;
|
||||
private _incomingAckTimeout: NodeJS.Timeout | null;
|
||||
|
||||
private _outgoingKeepAliveTimeout: NodeJS.Timeout | null;
|
||||
private _incomingKeepAliveTimeout: NodeJS.Timeout | null;
|
||||
|
||||
private _socket: Socket;
|
||||
private _socketWriter: ProtocolWriter;
|
||||
private _socketReader: ProtocolReader;
|
||||
private _socketReaderListener: IDisposable;
|
||||
|
||||
private readonly _socketCloseListener: () => void;
|
||||
private readonly _socketEndListener: () => void;
|
||||
private readonly _socketErrorListener: (err: any) => void;
|
||||
|
||||
private _onControlMessage = new Emitter<Buffer>();
|
||||
readonly onControlMessage: Event<Buffer> = createBufferedEvent(this._onControlMessage.event);
|
||||
|
||||
private _onMessage = new Emitter<Buffer>();
|
||||
readonly onMessage: Event<Buffer> = createBufferedEvent(this._onMessage.event);
|
||||
|
||||
private _onClose = new Emitter<void>();
|
||||
readonly onClose: Event<void> = createBufferedEvent(this._onClose.event);
|
||||
|
||||
private _onSocketClose = new Emitter<void>();
|
||||
readonly onSocketClose: Event<void> = createBufferedEvent(this._onSocketClose.event);
|
||||
|
||||
private _onSocketTimeout = new Emitter<void>();
|
||||
readonly onSocketTimeout: Event<void> = createBufferedEvent(this._onSocketTimeout.event);
|
||||
|
||||
public get unacknowledgedCount(): number {
|
||||
return this._outgoingMsgId - this._outgoingAckId;
|
||||
}
|
||||
|
||||
constructor(socket: Socket, initialChunk: Buffer | null = null, logFileName: string | null = null) {
|
||||
this._logFile = 0;
|
||||
this._isReconnecting = false;
|
||||
if (logFileName) {
|
||||
console.log(`PersistentProtocol log file: ${logFileName}`);
|
||||
this._logFile = fs.openSync(logFileName, 'a');
|
||||
}
|
||||
this._outgoingUnackMsg = new Queue<ProtocolMessage>();
|
||||
this._outgoingMsgId = 0;
|
||||
this._outgoingAckId = 0;
|
||||
this._outgoingAckTimeout = null;
|
||||
|
||||
this._incomingMsgId = 0;
|
||||
this._incomingAckId = 0;
|
||||
this._incomingMsgLastTime = 0;
|
||||
this._incomingAckTimeout = null;
|
||||
|
||||
this._outgoingKeepAliveTimeout = null;
|
||||
this._incomingKeepAliveTimeout = null;
|
||||
|
||||
this._socketCloseListener = () => {
|
||||
console.log(`socket triggered close event!`);
|
||||
this._onSocketClose.fire();
|
||||
};
|
||||
this._socketEndListener = () => {
|
||||
// received FIN
|
||||
this._onClose.fire();
|
||||
};
|
||||
this._socketErrorListener = (err) => {
|
||||
console.log(`socket had an error: `, err);
|
||||
};
|
||||
|
||||
this._socket = socket;
|
||||
this._socketWriter = new ProtocolWriter(this._socket, this._logFile);
|
||||
this._socketReader = new ProtocolReader(this._socket);
|
||||
this._socketReaderListener = this._socketReader.onMessage(msg => this._receiveMessage(msg));
|
||||
this._socket.on('close', this._socketCloseListener);
|
||||
this._socket.on('end', this._socketEndListener);
|
||||
this._socket.on('error', this._socketErrorListener);
|
||||
if (initialChunk) {
|
||||
this._socketReader.acceptChunk(initialChunk);
|
||||
}
|
||||
|
||||
this._sendKeepAliveCheck();
|
||||
this._recvKeepAliveCheck();
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
if (this._outgoingAckTimeout) {
|
||||
clearTimeout(this._outgoingAckTimeout);
|
||||
this._outgoingAckTimeout = null;
|
||||
}
|
||||
if (this._incomingAckTimeout) {
|
||||
clearTimeout(this._incomingAckTimeout);
|
||||
this._incomingAckTimeout = null;
|
||||
}
|
||||
if (this._outgoingKeepAliveTimeout) {
|
||||
clearTimeout(this._outgoingKeepAliveTimeout);
|
||||
this._outgoingKeepAliveTimeout = null;
|
||||
}
|
||||
if (this._incomingKeepAliveTimeout) {
|
||||
clearTimeout(this._incomingKeepAliveTimeout);
|
||||
this._incomingKeepAliveTimeout = null;
|
||||
}
|
||||
if (this._logFile) {
|
||||
fs.closeSync(this._logFile);
|
||||
}
|
||||
this._socketWriter.dispose();
|
||||
this._socketReader.dispose();
|
||||
this._socketReaderListener.dispose();
|
||||
this._socket.removeListener('close', this._socketCloseListener);
|
||||
this._socket.removeListener('end', this._socketEndListener);
|
||||
this._socket.removeListener('error', this._socketErrorListener);
|
||||
}
|
||||
|
||||
private _sendKeepAliveCheck(): void {
|
||||
if (this._outgoingKeepAliveTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const timeSinceLastOutgoingMsg = Date.now() - this._socketWriter.lastWriteTime;
|
||||
if (timeSinceLastOutgoingMsg >= ProtocolConstants.KeepAliveTime) {
|
||||
// sufficient time has passed since last message was written,
|
||||
// and no message from our side needed to be sent in the meantime,
|
||||
// so we will send a message containing only a keep alive.
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, 0, EMPTY_BUFFER);
|
||||
this._socketWriter.write(msg);
|
||||
this._sendKeepAliveCheck();
|
||||
return;
|
||||
}
|
||||
|
||||
this._outgoingKeepAliveTimeout = setTimeout(() => {
|
||||
this._outgoingKeepAliveTimeout = null;
|
||||
this._sendKeepAliveCheck();
|
||||
}, ProtocolConstants.KeepAliveTime - timeSinceLastOutgoingMsg + 5);
|
||||
}
|
||||
|
||||
private _recvKeepAliveCheck(): void {
|
||||
if (this._incomingKeepAliveTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const timeSinceLastIncomingMsg = Date.now() - this._socketReader.lastReadTime;
|
||||
if (timeSinceLastIncomingMsg >= ProtocolConstants.KeepAliveTimeoutTime) {
|
||||
// Trash the socket
|
||||
this._onSocketTimeout.fire(undefined);
|
||||
return;
|
||||
}
|
||||
|
||||
this._incomingKeepAliveTimeout = setTimeout(() => {
|
||||
this._incomingKeepAliveTimeout = null;
|
||||
this._recvKeepAliveCheck();
|
||||
}, ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg + 5);
|
||||
}
|
||||
|
||||
public getSocket(): Socket {
|
||||
return this._socket;
|
||||
}
|
||||
|
||||
public beginAcceptReconnection(socket: Socket, initialDataChunk: Buffer | null): void {
|
||||
this._isReconnecting = true;
|
||||
|
||||
this._socketWriter.dispose();
|
||||
this._socketReader.dispose();
|
||||
this._socketReaderListener.dispose();
|
||||
this._socket.removeListener('close', this._socketCloseListener);
|
||||
this._socket.removeListener('end', this._socketEndListener);
|
||||
this._socket.removeListener('error', this._socketErrorListener);
|
||||
|
||||
this._socket = socket;
|
||||
|
||||
this._socketWriter = new ProtocolWriter(this._socket, this._logFile);
|
||||
this._socketReader = new ProtocolReader(this._socket);
|
||||
this._socketReaderListener = this._socketReader.onMessage(msg => this._receiveMessage(msg));
|
||||
this._socketReader.acceptChunk(initialDataChunk);
|
||||
this._socket.on('close', this._socketCloseListener);
|
||||
this._socket.on('end', this._socketEndListener);
|
||||
this._socket.on('error', this._socketErrorListener);
|
||||
}
|
||||
|
||||
public endAcceptReconnection(): void {
|
||||
this._isReconnecting = false;
|
||||
|
||||
// Send again all unacknowledged messages
|
||||
const toSend = this._outgoingUnackMsg.toArray();
|
||||
for (let i = 0, len = toSend.length; i < len; i++) {
|
||||
this._socketWriter.write(toSend[i]);
|
||||
}
|
||||
this._recvAckCheck();
|
||||
|
||||
this._sendKeepAliveCheck();
|
||||
this._recvKeepAliveCheck();
|
||||
}
|
||||
|
||||
private _receiveMessage(msg: ProtocolMessage): void {
|
||||
if (this._logFile) {
|
||||
log(this._logFile, `recv-${ProtocolMessageTypeToString(msg.type)}-${msg.id}-${msg.ack}-`, msg.data);
|
||||
}
|
||||
if (msg.ack > this._outgoingAckId) {
|
||||
this._outgoingAckId = msg.ack;
|
||||
do {
|
||||
const first = this._outgoingUnackMsg.peek();
|
||||
if (first && first.id <= msg.ack) {
|
||||
// this message has been confirmed, remove it
|
||||
this._outgoingUnackMsg.pop();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
|
||||
if (msg.type === ProtocolMessageType.Regular) {
|
||||
if (msg.id > this._incomingMsgId) {
|
||||
if (msg.id !== this._incomingMsgId + 1) {
|
||||
console.error(`PROTOCOL CORRUPTION, LAST SAW MSG ${this._incomingMsgId} AND HAVE NOW RECEIVED MSG ${msg.id}`);
|
||||
}
|
||||
this._incomingMsgId = msg.id;
|
||||
this._incomingMsgLastTime = Date.now();
|
||||
this._sendAckCheck();
|
||||
this._onMessage.fire(msg.data);
|
||||
}
|
||||
} else if (msg.type === ProtocolMessageType.Control) {
|
||||
this._onControlMessage.fire(msg.data);
|
||||
}
|
||||
}
|
||||
|
||||
readEntireBuffer(): Buffer {
|
||||
return this._socketReader.readEntireBuffer();
|
||||
}
|
||||
|
||||
flush(): void {
|
||||
this._socketWriter.flush();
|
||||
}
|
||||
|
||||
send(buffer: Buffer): void {
|
||||
const myId = ++this._outgoingMsgId;
|
||||
this._incomingAckId = this._incomingMsgId;
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.Regular, myId, this._incomingAckId, buffer);
|
||||
this._outgoingUnackMsg.push(msg);
|
||||
if (!this._isReconnecting) {
|
||||
this._socketWriter.write(msg);
|
||||
this._recvAckCheck();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message which will not be part of the regular acknowledge flow.
|
||||
* Use this for early control messages which are repeated in case of reconnection.
|
||||
*/
|
||||
sendControl(buffer: Buffer): void {
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.Control, 0, 0, buffer);
|
||||
this._socketWriter.write(msg);
|
||||
}
|
||||
|
||||
private _sendAckCheck(): void {
|
||||
if (this._incomingMsgId <= this._incomingAckId) {
|
||||
// nothink to acknowledge
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._incomingAckTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const timeSinceLastIncomingMsg = Date.now() - this._incomingMsgLastTime;
|
||||
if (timeSinceLastIncomingMsg >= ProtocolConstants.AcknowledgeTime) {
|
||||
// sufficient time has passed since this message has been received,
|
||||
// and no message from our side needed to be sent in the meantime,
|
||||
// so we will send a message containing only an ack.
|
||||
this._sendAck();
|
||||
return;
|
||||
}
|
||||
|
||||
this._incomingAckTimeout = setTimeout(() => {
|
||||
this._incomingAckTimeout = null;
|
||||
this._sendAckCheck();
|
||||
}, ProtocolConstants.AcknowledgeTime - timeSinceLastIncomingMsg + 5);
|
||||
}
|
||||
|
||||
private _recvAckCheck(): void {
|
||||
if (this._outgoingMsgId <= this._outgoingAckId) {
|
||||
// everything has been acknowledged
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._outgoingAckTimeout) {
|
||||
// there will be a check in the near future
|
||||
return;
|
||||
}
|
||||
|
||||
const oldestUnacknowledgedMsg = this._outgoingUnackMsg.peek()!;
|
||||
const timeSinceOldestUnacknowledgedMsg = Date.now() - oldestUnacknowledgedMsg.writtenTime;
|
||||
if (timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.AcknowledgeTimeoutTime) {
|
||||
// Trash the socket
|
||||
this._onSocketTimeout.fire(undefined);
|
||||
return;
|
||||
}
|
||||
|
||||
this._outgoingAckTimeout = setTimeout(() => {
|
||||
this._outgoingAckTimeout = null;
|
||||
this._recvAckCheck();
|
||||
}, ProtocolConstants.AcknowledgeTimeoutTime - timeSinceOldestUnacknowledgedMsg + 5);
|
||||
}
|
||||
|
||||
private _sendAck(): void {
|
||||
if (this._incomingMsgId <= this._incomingAckId) {
|
||||
// nothink to acknowledge
|
||||
return;
|
||||
}
|
||||
|
||||
this._incomingAckId = this._incomingMsgId;
|
||||
const msg = new ProtocolMessage(ProtocolMessageType.Ack, 0, this._incomingAckId, EMPTY_BUFFER);
|
||||
this._socketWriter.write(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,733 +0,0 @@
|
||||
/*---------------------------------------------------------------------------------------------
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the Source EULA. See License.txt in the project root for license information.
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import { IDisposable, toDisposable, combinedDisposable } from 'vs/base/common/lifecycle';
|
||||
import { Event, Emitter, Relay } from 'vs/base/common/event';
|
||||
import { CancelablePromise, createCancelablePromise, timeout } from 'vs/base/common/async';
|
||||
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
|
||||
import * as errors from 'vs/base/common/errors';
|
||||
import { IServerChannel, IChannel } from 'vs/base/parts/ipc/common/ipc';
|
||||
|
||||
export const enum RequestType {
|
||||
Promise = 100,
|
||||
PromiseCancel = 101,
|
||||
EventListen = 102,
|
||||
EventDispose = 103
|
||||
}
|
||||
|
||||
type IRawPromiseRequest = { type: RequestType.Promise; id: number; channelName: string; name: string; arg: any; };
|
||||
type IRawPromiseCancelRequest = { type: RequestType.PromiseCancel, id: number };
|
||||
type IRawEventListenRequest = { type: RequestType.EventListen; id: number; channelName: string; name: string; arg: any; };
|
||||
type IRawEventDisposeRequest = { type: RequestType.EventDispose, id: number };
|
||||
type IRawRequest = IRawPromiseRequest | IRawPromiseCancelRequest | IRawEventListenRequest | IRawEventDisposeRequest;
|
||||
|
||||
export const enum ResponseType {
|
||||
Initialize = 200,
|
||||
PromiseSuccess = 201,
|
||||
PromiseError = 202,
|
||||
PromiseErrorObj = 203,
|
||||
EventFire = 204
|
||||
}
|
||||
|
||||
type IRawInitializeResponse = { type: ResponseType.Initialize };
|
||||
type IRawPromiseSuccessResponse = { type: ResponseType.PromiseSuccess; id: number; data: any };
|
||||
type IRawPromiseErrorResponse = { type: ResponseType.PromiseError; id: number; data: { message: string, name: string, stack: string[] | undefined } };
|
||||
type IRawPromiseErrorObjResponse = { type: ResponseType.PromiseErrorObj; id: number; data: any };
|
||||
type IRawEventFireResponse = { type: ResponseType.EventFire; id: number; data: any };
|
||||
type IRawResponse = IRawInitializeResponse | IRawPromiseSuccessResponse | IRawPromiseErrorResponse | IRawPromiseErrorObjResponse | IRawEventFireResponse;
|
||||
|
||||
interface IHandler {
|
||||
(response: IRawResponse): void;
|
||||
}
|
||||
|
||||
export interface IMessagePassingProtocol {
|
||||
send(buffer: Buffer): void;
|
||||
onMessage: Event<Buffer>;
|
||||
}
|
||||
|
||||
enum State {
|
||||
Uninitialized,
|
||||
Idle
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IChannelServer` hosts a collection of channels. You are
|
||||
* able to register channels onto it, provided a channel name.
|
||||
*/
|
||||
export interface IChannelServer<TContext = string> {
|
||||
registerChannel(channelName: string, channel: IServerChannel<TContext>): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IChannelClient` has access to a collection of channels. You
|
||||
* are able to get those channels, given their channel name.
|
||||
*/
|
||||
export interface IChannelClient {
|
||||
getChannel<T extends IChannel>(channelName: string): T;
|
||||
}
|
||||
|
||||
export interface Client<TContext> {
|
||||
readonly ctx: TContext;
|
||||
}
|
||||
|
||||
export interface IConnectionHub<TContext> {
|
||||
readonly connections: Connection<TContext>[];
|
||||
readonly onDidChangeConnections: Event<Connection<TContext>>;
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IClientRouter` is responsible for routing calls to specific
|
||||
* channels, in scenarios in which there are multiple possible
|
||||
* channels (each from a separate client) to pick from.
|
||||
*/
|
||||
export interface IClientRouter<TContext = string> {
|
||||
routeCall(hub: IConnectionHub<TContext>, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<Client<TContext>>;
|
||||
routeEvent(hub: IConnectionHub<TContext>, event: string, arg?: any): Promise<Client<TContext>>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar to the `IChannelClient`, you can get channels from this
|
||||
* collection of channels. The difference being that in the
|
||||
* `IRoutingChannelClient`, there are multiple clients providing
|
||||
* the same channel. You'll need to pass in an `IClientRouter` in
|
||||
* order to pick the right one.
|
||||
*/
|
||||
export interface IRoutingChannelClient<TContext = string> {
|
||||
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T;
|
||||
}
|
||||
|
||||
interface IReader {
|
||||
read(bytes: number): Buffer;
|
||||
}
|
||||
|
||||
interface IWriter {
|
||||
write(buffer: Buffer): void;
|
||||
}
|
||||
|
||||
class BufferReader implements IReader {
|
||||
|
||||
private pos = 0;
|
||||
|
||||
constructor(private buffer: Buffer) { }
|
||||
|
||||
read(bytes: number): Buffer {
|
||||
const result = this.buffer.slice(this.pos, this.pos + bytes);
|
||||
this.pos += result.length;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
class BufferWriter implements IWriter {
|
||||
|
||||
private buffers: Buffer[] = [];
|
||||
|
||||
get buffer(): Buffer {
|
||||
return Buffer.concat(this.buffers);
|
||||
}
|
||||
|
||||
write(buffer: Buffer): void {
|
||||
this.buffers.push(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
enum DataType {
|
||||
Undefined = 0,
|
||||
String = 1,
|
||||
Buffer = 2,
|
||||
Array = 3,
|
||||
Object = 4
|
||||
}
|
||||
|
||||
function createSizeBuffer(size: number): Buffer {
|
||||
const result = Buffer.allocUnsafe(4);
|
||||
result.writeUInt32BE(size, 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
function readSizeBuffer(reader: IReader): number {
|
||||
return reader.read(4).readUInt32BE(0);
|
||||
}
|
||||
|
||||
const BufferPresets = {
|
||||
Undefined: Buffer.alloc(1, DataType.Undefined),
|
||||
String: Buffer.alloc(1, DataType.String),
|
||||
Buffer: Buffer.alloc(1, DataType.Buffer),
|
||||
Array: Buffer.alloc(1, DataType.Array),
|
||||
Object: Buffer.alloc(1, DataType.Object)
|
||||
};
|
||||
|
||||
function serialize(writer: IWriter, data: any): void {
|
||||
if (typeof data === 'undefined') {
|
||||
writer.write(BufferPresets.Undefined);
|
||||
} else if (typeof data === 'string') {
|
||||
const buffer = Buffer.from(data);
|
||||
writer.write(BufferPresets.String);
|
||||
writer.write(createSizeBuffer(buffer.length));
|
||||
writer.write(buffer);
|
||||
} else if (Buffer.isBuffer(data)) {
|
||||
writer.write(BufferPresets.Buffer);
|
||||
writer.write(createSizeBuffer(data.length));
|
||||
writer.write(data);
|
||||
} else if (Array.isArray(data)) {
|
||||
writer.write(BufferPresets.Array);
|
||||
writer.write(createSizeBuffer(data.length));
|
||||
|
||||
for (const el of data) {
|
||||
serialize(writer, el);
|
||||
}
|
||||
} else {
|
||||
const buffer = Buffer.from(JSON.stringify(data));
|
||||
writer.write(BufferPresets.Object);
|
||||
writer.write(createSizeBuffer(buffer.length));
|
||||
writer.write(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
function deserialize(reader: IReader): any {
|
||||
const type = reader.read(1).readUInt8(0);
|
||||
|
||||
switch (type) {
|
||||
case DataType.Undefined: return undefined;
|
||||
case DataType.String: return reader.read(readSizeBuffer(reader)).toString();
|
||||
case DataType.Buffer: return reader.read(readSizeBuffer(reader));
|
||||
case DataType.Array: {
|
||||
const length = readSizeBuffer(reader);
|
||||
const result: any[] = [];
|
||||
|
||||
for (let i = 0; i < length; i++) {
|
||||
result.push(deserialize(reader));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
case DataType.Object: return JSON.parse(reader.read(readSizeBuffer(reader)).toString());
|
||||
}
|
||||
}
|
||||
|
||||
export class ChannelServer<TContext = string> implements IChannelServer<TContext>, IDisposable {
|
||||
|
||||
private channels = new Map<string, IServerChannel<TContext>>();
|
||||
private activeRequests = new Map<number, IDisposable>();
|
||||
private protocolListener: IDisposable | null;
|
||||
|
||||
constructor(private protocol: IMessagePassingProtocol, private ctx: TContext) {
|
||||
this.protocolListener = this.protocol.onMessage(msg => this.onRawMessage(msg));
|
||||
this.sendResponse({ type: ResponseType.Initialize });
|
||||
}
|
||||
|
||||
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
|
||||
this.channels.set(channelName, channel);
|
||||
}
|
||||
|
||||
private sendResponse(response: IRawResponse): void {
|
||||
switch (response.type) {
|
||||
case ResponseType.Initialize:
|
||||
return this.send([response.type]);
|
||||
|
||||
case ResponseType.PromiseSuccess:
|
||||
case ResponseType.PromiseError:
|
||||
case ResponseType.EventFire:
|
||||
case ResponseType.PromiseErrorObj:
|
||||
return this.send([response.type, response.id], response.data);
|
||||
}
|
||||
}
|
||||
|
||||
private send(header: any, body: any = undefined): void {
|
||||
const writer = new BufferWriter();
|
||||
serialize(writer, header);
|
||||
serialize(writer, body);
|
||||
this.sendBuffer(writer.buffer);
|
||||
}
|
||||
|
||||
private sendBuffer(message: Buffer): void {
|
||||
try {
|
||||
this.protocol.send(message);
|
||||
} catch (err) {
|
||||
// noop
|
||||
}
|
||||
}
|
||||
|
||||
private onRawMessage(message: Buffer): void {
|
||||
const reader = new BufferReader(message);
|
||||
const header = deserialize(reader);
|
||||
const body = deserialize(reader);
|
||||
const type = header[0] as RequestType;
|
||||
|
||||
switch (type) {
|
||||
case RequestType.Promise:
|
||||
return this.onPromise({ type, id: header[1], channelName: header[2], name: header[3], arg: body });
|
||||
case RequestType.EventListen:
|
||||
return this.onEventListen({ type, id: header[1], channelName: header[2], name: header[3], arg: body });
|
||||
case RequestType.PromiseCancel:
|
||||
return this.disposeActiveRequest({ type, id: header[1] });
|
||||
case RequestType.EventDispose:
|
||||
return this.disposeActiveRequest({ type, id: header[1] });
|
||||
}
|
||||
}
|
||||
|
||||
private onPromise(request: IRawPromiseRequest): void {
|
||||
const channel = this.channels.get(request.channelName);
|
||||
if (!channel) {
|
||||
throw new Error('Unknown channel');
|
||||
}
|
||||
const cancellationTokenSource = new CancellationTokenSource();
|
||||
let promise: Promise<any>;
|
||||
|
||||
try {
|
||||
promise = channel.call(this.ctx, request.name, request.arg, cancellationTokenSource.token);
|
||||
} catch (err) {
|
||||
promise = Promise.reject(err);
|
||||
}
|
||||
|
||||
const id = request.id;
|
||||
|
||||
promise.then(data => {
|
||||
this.sendResponse(<IRawResponse>{ id, data, type: ResponseType.PromiseSuccess });
|
||||
this.activeRequests.delete(request.id);
|
||||
}, err => {
|
||||
if (err instanceof Error) {
|
||||
this.sendResponse(<IRawResponse>{
|
||||
id, data: {
|
||||
message: err.message,
|
||||
name: err.name,
|
||||
stack: err.stack ? (err.stack.split ? err.stack.split('\n') : err.stack) : undefined
|
||||
}, type: ResponseType.PromiseError
|
||||
});
|
||||
} else {
|
||||
this.sendResponse(<IRawResponse>{ id, data: err, type: ResponseType.PromiseErrorObj });
|
||||
}
|
||||
|
||||
this.activeRequests.delete(request.id);
|
||||
});
|
||||
|
||||
const disposable = toDisposable(() => cancellationTokenSource.cancel());
|
||||
this.activeRequests.set(request.id, disposable);
|
||||
}
|
||||
|
||||
private onEventListen(request: IRawEventListenRequest): void {
|
||||
const channel = this.channels.get(request.channelName);
|
||||
if (!channel) {
|
||||
throw new Error('Unknown channel');
|
||||
}
|
||||
|
||||
const id = request.id;
|
||||
const event = channel.listen(this.ctx, request.name, request.arg);
|
||||
const disposable = event(data => this.sendResponse(<IRawResponse>{ id, data, type: ResponseType.EventFire }));
|
||||
|
||||
this.activeRequests.set(request.id, disposable);
|
||||
}
|
||||
|
||||
private disposeActiveRequest(request: IRawRequest): void {
|
||||
const disposable = this.activeRequests.get(request.id);
|
||||
|
||||
if (disposable) {
|
||||
disposable.dispose();
|
||||
this.activeRequests.delete(request.id);
|
||||
}
|
||||
}
|
||||
|
||||
public dispose(): void {
|
||||
if (this.protocolListener) {
|
||||
this.protocolListener.dispose();
|
||||
this.protocolListener = null;
|
||||
}
|
||||
this.activeRequests.forEach(d => d.dispose());
|
||||
this.activeRequests.clear();
|
||||
}
|
||||
}
|
||||
|
||||
export class ChannelClient implements IChannelClient, IDisposable {
|
||||
|
||||
private state: State = State.Uninitialized;
|
||||
private activeRequests = new Set<IDisposable>();
|
||||
private handlers = new Map<number, IHandler>();
|
||||
private lastRequestId: number = 0;
|
||||
private protocolListener: IDisposable | null;
|
||||
|
||||
private _onDidInitialize = new Emitter<void>();
|
||||
readonly onDidInitialize = this._onDidInitialize.event;
|
||||
|
||||
constructor(private protocol: IMessagePassingProtocol) {
|
||||
this.protocolListener = this.protocol.onMessage(msg => this.onBuffer(msg));
|
||||
}
|
||||
|
||||
getChannel<T extends IChannel>(channelName: string): T {
|
||||
const that = this;
|
||||
|
||||
return {
|
||||
call(command: string, arg?: any, cancellationToken?: CancellationToken) {
|
||||
return that.requestPromise(channelName, command, arg, cancellationToken);
|
||||
},
|
||||
listen(event: string, arg: any) {
|
||||
return that.requestEvent(channelName, event, arg);
|
||||
}
|
||||
} as T;
|
||||
}
|
||||
|
||||
private requestPromise(channelName: string, name: string, arg?: any, cancellationToken = CancellationToken.None): Promise<any> {
|
||||
const id = this.lastRequestId++;
|
||||
const type = RequestType.Promise;
|
||||
const request: IRawRequest = { id, type, channelName, name, arg };
|
||||
|
||||
if (cancellationToken.isCancellationRequested) {
|
||||
return Promise.reject(errors.canceled());
|
||||
}
|
||||
|
||||
let disposable: IDisposable;
|
||||
|
||||
const result = new Promise((c, e) => {
|
||||
if (cancellationToken.isCancellationRequested) {
|
||||
return e(errors.canceled());
|
||||
}
|
||||
|
||||
let uninitializedPromise: CancelablePromise<void> | null = createCancelablePromise(_ => this.whenInitialized());
|
||||
uninitializedPromise.then(() => {
|
||||
uninitializedPromise = null;
|
||||
|
||||
const handler: IHandler = response => {
|
||||
switch (response.type) {
|
||||
case ResponseType.PromiseSuccess:
|
||||
this.handlers.delete(id);
|
||||
c(response.data);
|
||||
break;
|
||||
|
||||
case ResponseType.PromiseError:
|
||||
this.handlers.delete(id);
|
||||
const error = new Error(response.data.message);
|
||||
(<any>error).stack = response.data.stack;
|
||||
error.name = response.data.name;
|
||||
e(error);
|
||||
break;
|
||||
|
||||
case ResponseType.PromiseErrorObj:
|
||||
this.handlers.delete(id);
|
||||
e(response.data);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
this.handlers.set(id, handler);
|
||||
this.sendRequest(request);
|
||||
});
|
||||
|
||||
const cancel = () => {
|
||||
if (uninitializedPromise) {
|
||||
uninitializedPromise.cancel();
|
||||
uninitializedPromise = null;
|
||||
} else {
|
||||
this.sendRequest({ id, type: RequestType.PromiseCancel });
|
||||
}
|
||||
|
||||
e(errors.canceled());
|
||||
};
|
||||
|
||||
const cancellationTokenListener = cancellationToken.onCancellationRequested(cancel);
|
||||
disposable = combinedDisposable([toDisposable(cancel), cancellationTokenListener]);
|
||||
this.activeRequests.add(disposable);
|
||||
});
|
||||
|
||||
return result.finally(() => this.activeRequests.delete(disposable));
|
||||
}
|
||||
|
||||
private requestEvent(channelName: string, name: string, arg?: any): Event<any> {
|
||||
const id = this.lastRequestId++;
|
||||
const type = RequestType.EventListen;
|
||||
const request: IRawRequest = { id, type, channelName, name, arg };
|
||||
|
||||
let uninitializedPromise: CancelablePromise<void> | null = null;
|
||||
|
||||
const emitter = new Emitter<any>({
|
||||
onFirstListenerAdd: () => {
|
||||
uninitializedPromise = createCancelablePromise(_ => this.whenInitialized());
|
||||
uninitializedPromise.then(() => {
|
||||
uninitializedPromise = null;
|
||||
this.activeRequests.add(emitter);
|
||||
this.sendRequest(request);
|
||||
});
|
||||
},
|
||||
onLastListenerRemove: () => {
|
||||
if (uninitializedPromise) {
|
||||
uninitializedPromise.cancel();
|
||||
uninitializedPromise = null;
|
||||
} else {
|
||||
this.activeRequests.delete(emitter);
|
||||
this.sendRequest({ id, type: RequestType.EventDispose });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const handler: IHandler = (res: IRawEventFireResponse) => emitter.fire(res.data);
|
||||
this.handlers.set(id, handler);
|
||||
|
||||
return emitter.event;
|
||||
}
|
||||
|
||||
private sendRequest(request: IRawRequest): void {
|
||||
switch (request.type) {
|
||||
case RequestType.Promise:
|
||||
case RequestType.EventListen:
|
||||
return this.send([request.type, request.id, request.channelName, request.name], request.arg);
|
||||
|
||||
case RequestType.PromiseCancel:
|
||||
case RequestType.EventDispose:
|
||||
return this.send([request.type, request.id]);
|
||||
}
|
||||
}
|
||||
|
||||
private send(header: any, body: any = undefined): void {
|
||||
const writer = new BufferWriter();
|
||||
serialize(writer, header);
|
||||
serialize(writer, body);
|
||||
this.sendBuffer(writer.buffer);
|
||||
}
|
||||
|
||||
private sendBuffer(message: Buffer): void {
|
||||
try {
|
||||
this.protocol.send(message);
|
||||
} catch (err) {
|
||||
// noop
|
||||
}
|
||||
}
|
||||
|
||||
private onBuffer(message: Buffer): void {
|
||||
const reader = new BufferReader(message);
|
||||
const header = deserialize(reader);
|
||||
const body = deserialize(reader);
|
||||
const type: ResponseType = header[0];
|
||||
|
||||
switch (type) {
|
||||
case ResponseType.Initialize:
|
||||
return this.onResponse({ type: header[0] });
|
||||
|
||||
case ResponseType.PromiseSuccess:
|
||||
case ResponseType.PromiseError:
|
||||
case ResponseType.EventFire:
|
||||
case ResponseType.PromiseErrorObj:
|
||||
return this.onResponse({ type: header[0], id: header[1], data: body });
|
||||
}
|
||||
}
|
||||
|
||||
private onResponse(response: IRawResponse): void {
|
||||
if (response.type === ResponseType.Initialize) {
|
||||
this.state = State.Idle;
|
||||
this._onDidInitialize.fire();
|
||||
return;
|
||||
}
|
||||
|
||||
const handler = this.handlers.get(response.id);
|
||||
|
||||
if (handler) {
|
||||
handler(response);
|
||||
}
|
||||
}
|
||||
|
||||
private whenInitialized(): Promise<void> {
|
||||
if (this.state === State.Idle) {
|
||||
return Promise.resolve();
|
||||
} else {
|
||||
return Event.toPromise(this.onDidInitialize);
|
||||
}
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
if (this.protocolListener) {
|
||||
this.protocolListener.dispose();
|
||||
this.protocolListener = null;
|
||||
}
|
||||
this.activeRequests.forEach(p => p.dispose());
|
||||
this.activeRequests.clear();
|
||||
}
|
||||
}
|
||||
|
||||
export interface ClientConnectionEvent {
|
||||
protocol: IMessagePassingProtocol;
|
||||
onDidClientDisconnect: Event<void>;
|
||||
}
|
||||
|
||||
interface Connection<TContext> extends Client<TContext> {
|
||||
readonly channelClient: ChannelClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IPCServer` is both a channel server and a routing channel
|
||||
* client.
|
||||
*
|
||||
* As the owner of a protocol, you should extend both this
|
||||
* and the `IPCClient` classes to get IPC implementations
|
||||
* for your protocol.
|
||||
*/
|
||||
export class IPCServer<TContext = string> implements IChannelServer<TContext>, IRoutingChannelClient<TContext>, IConnectionHub<TContext>, IDisposable {
|
||||
|
||||
private channels = new Map<string, IServerChannel<TContext>>();
|
||||
private _connections = new Set<Connection<TContext>>();
|
||||
|
||||
private _onDidChangeConnections = new Emitter<Connection<TContext>>();
|
||||
readonly onDidChangeConnections: Event<Connection<TContext>> = this._onDidChangeConnections.event;
|
||||
|
||||
get connections(): Connection<TContext>[] {
|
||||
const result: Connection<TContext>[] = [];
|
||||
this._connections.forEach(ctx => result.push(ctx));
|
||||
return result;
|
||||
}
|
||||
|
||||
constructor(onDidClientConnect: Event<ClientConnectionEvent>) {
|
||||
onDidClientConnect(({ protocol, onDidClientDisconnect }) => {
|
||||
const onFirstMessage = Event.once(protocol.onMessage);
|
||||
|
||||
onFirstMessage(msg => {
|
||||
const reader = new BufferReader(msg);
|
||||
const ctx = deserialize(reader) as TContext;
|
||||
|
||||
const channelServer = new ChannelServer(protocol, ctx);
|
||||
const channelClient = new ChannelClient(protocol);
|
||||
|
||||
this.channels.forEach((channel, name) => channelServer.registerChannel(name, channel));
|
||||
|
||||
const connection: Connection<TContext> = { channelClient, ctx };
|
||||
this._connections.add(connection);
|
||||
this._onDidChangeConnections.fire(connection);
|
||||
|
||||
onDidClientDisconnect(() => {
|
||||
channelServer.dispose();
|
||||
channelClient.dispose();
|
||||
this._connections.delete(connection);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T {
|
||||
const that = this;
|
||||
|
||||
return {
|
||||
call(command: string, arg?: any, cancellationToken?: CancellationToken) {
|
||||
const channelPromise = router.routeCall(that, command, arg)
|
||||
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
|
||||
|
||||
return getDelayedChannel(channelPromise)
|
||||
.call(command, arg, cancellationToken);
|
||||
},
|
||||
listen(event: string, arg: any) {
|
||||
const channelPromise = router.routeEvent(that, event, arg)
|
||||
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
|
||||
|
||||
return getDelayedChannel(channelPromise)
|
||||
.listen(event, arg);
|
||||
}
|
||||
} as T;
|
||||
}
|
||||
|
||||
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
|
||||
this.channels.set(channelName, channel);
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
this.channels.clear();
|
||||
this._connections.clear();
|
||||
this._onDidChangeConnections.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An `IPCClient` is both a channel client and a channel server.
|
||||
*
|
||||
* As the owner of a protocol, you should extend both this
|
||||
* and the `IPCClient` classes to get IPC implementations
|
||||
* for your protocol.
|
||||
*/
|
||||
export class IPCClient<TContext = string> implements IChannelClient, IChannelServer<TContext>, IDisposable {
|
||||
|
||||
private channelClient: ChannelClient;
|
||||
private channelServer: ChannelServer<TContext>;
|
||||
|
||||
constructor(protocol: IMessagePassingProtocol, ctx: TContext) {
|
||||
const writer = new BufferWriter();
|
||||
serialize(writer, ctx);
|
||||
protocol.send(writer.buffer);
|
||||
|
||||
this.channelClient = new ChannelClient(protocol);
|
||||
this.channelServer = new ChannelServer(protocol, ctx);
|
||||
}
|
||||
|
||||
getChannel<T extends IChannel>(channelName: string): T {
|
||||
return this.channelClient.getChannel(channelName) as T;
|
||||
}
|
||||
|
||||
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
|
||||
this.channelServer.registerChannel(channelName, channel);
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
this.channelClient.dispose();
|
||||
this.channelServer.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
export function getDelayedChannel<T extends IChannel>(promise: Promise<T>): T {
|
||||
return {
|
||||
call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
|
||||
return promise.then(c => c.call<T>(command, arg, cancellationToken));
|
||||
},
|
||||
|
||||
listen<T>(event: string, arg?: any): Event<T> {
|
||||
const relay = new Relay<any>();
|
||||
promise.then(c => relay.input = c.listen(event, arg));
|
||||
return relay.event;
|
||||
}
|
||||
} as T;
|
||||
}
|
||||
|
||||
export function getNextTickChannel<T extends IChannel>(channel: T): T {
|
||||
let didTick = false;
|
||||
|
||||
return {
|
||||
call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
|
||||
if (didTick) {
|
||||
return channel.call(command, arg, cancellationToken);
|
||||
}
|
||||
|
||||
return timeout(0)
|
||||
.then(() => didTick = true)
|
||||
.then(() => channel.call<T>(command, arg, cancellationToken));
|
||||
},
|
||||
listen<T>(event: string, arg?: any): Event<T> {
|
||||
if (didTick) {
|
||||
return channel.listen<T>(event, arg);
|
||||
}
|
||||
|
||||
const relay = new Relay<T>();
|
||||
|
||||
timeout(0)
|
||||
.then(() => didTick = true)
|
||||
.then(() => relay.input = channel.listen<T>(event, arg));
|
||||
|
||||
return relay.event;
|
||||
}
|
||||
} as T;
|
||||
}
|
||||
|
||||
export class StaticRouter<TContext = string> implements IClientRouter<TContext> {
|
||||
|
||||
constructor(private fn: (ctx: TContext) => boolean | Promise<boolean>) { }
|
||||
|
||||
routeCall(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
|
||||
return this.route(hub);
|
||||
}
|
||||
|
||||
routeEvent(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
|
||||
return this.route(hub);
|
||||
}
|
||||
|
||||
private async route(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
|
||||
for (const connection of hub.connections) {
|
||||
if (await Promise.resolve(this.fn(connection.ctx))) {
|
||||
return Promise.resolve(connection);
|
||||
}
|
||||
}
|
||||
|
||||
await Event.toPromise(hub.onDidChangeConnections);
|
||||
return await this.route(hub);
|
||||
}
|
||||
}
|
||||
@@ -6,12 +6,14 @@
|
||||
import * as assert from 'assert';
|
||||
import { Socket } from 'net';
|
||||
import { EventEmitter } from 'events';
|
||||
import { Protocol, PersistentProtocol } from 'vs/base/parts/ipc/node/ipc.net';
|
||||
import { Protocol, PersistentProtocol } from 'vs/base/parts/ipc/common/ipc.net';
|
||||
import { NodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
|
||||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
|
||||
class MessageStream {
|
||||
|
||||
private _currentComplete: ((data: Buffer) => void) | null;
|
||||
private _messages: Buffer[];
|
||||
private _currentComplete: ((data: VSBuffer) => void) | null;
|
||||
private _messages: VSBuffer[];
|
||||
|
||||
constructor(x: Protocol | PersistentProtocol) {
|
||||
this._currentComplete = null;
|
||||
@@ -36,8 +38,8 @@ class MessageStream {
|
||||
complete(msg);
|
||||
}
|
||||
|
||||
public waitForOne(): Promise<Buffer> {
|
||||
return new Promise<Buffer>((complete) => {
|
||||
public waitForOne(): Promise<VSBuffer> {
|
||||
return new Promise<VSBuffer>((complete) => {
|
||||
this._currentComplete = complete;
|
||||
this._trigger();
|
||||
});
|
||||
@@ -53,6 +55,9 @@ class EtherStream extends EventEmitter {
|
||||
}
|
||||
|
||||
write(data: Buffer, cb?: Function): boolean {
|
||||
if (!Buffer.isBuffer(data)) {
|
||||
throw new Error(`Invalid data`);
|
||||
}
|
||||
this._ether.write(this._name, data);
|
||||
return true;
|
||||
}
|
||||
@@ -122,26 +127,26 @@ suite('IPC, Socket Protocol', () => {
|
||||
|
||||
test('read/write', async () => {
|
||||
|
||||
const a = new Protocol(ether.a);
|
||||
const b = new Protocol(ether.b);
|
||||
const a = new Protocol(new NodeSocket(ether.a));
|
||||
const b = new Protocol(new NodeSocket(ether.b));
|
||||
const bMessages = new MessageStream(b);
|
||||
|
||||
a.send(Buffer.from('foobarfarboo'));
|
||||
a.send(VSBuffer.fromString('foobarfarboo'));
|
||||
const msg1 = await bMessages.waitForOne();
|
||||
assert.equal(msg1.toString(), 'foobarfarboo');
|
||||
|
||||
const buffer = Buffer.allocUnsafe(1);
|
||||
buffer.writeInt8(123, 0);
|
||||
const buffer = VSBuffer.alloc(1);
|
||||
buffer.writeUint8(123, 0);
|
||||
a.send(buffer);
|
||||
const msg2 = await bMessages.waitForOne();
|
||||
assert.equal(msg2.readInt8(0), 123);
|
||||
assert.equal(msg2.readUint8(0), 123);
|
||||
});
|
||||
|
||||
|
||||
test('read/write, object data', async () => {
|
||||
|
||||
const a = new Protocol(ether.a);
|
||||
const b = new Protocol(ether.b);
|
||||
const a = new Protocol(new NodeSocket(ether.a));
|
||||
const b = new Protocol(new NodeSocket(ether.b));
|
||||
const bMessages = new MessageStream(b);
|
||||
|
||||
const data = {
|
||||
@@ -151,7 +156,7 @@ suite('IPC, Socket Protocol', () => {
|
||||
data: 'Hello World'.split('')
|
||||
};
|
||||
|
||||
a.send(Buffer.from(JSON.stringify(data)));
|
||||
a.send(VSBuffer.fromString(JSON.stringify(data)));
|
||||
const msg = await bMessages.waitForOne();
|
||||
assert.deepEqual(JSON.parse(msg.toString()), data);
|
||||
});
|
||||
@@ -166,20 +171,20 @@ suite('PersistentProtocol reconnection', () => {
|
||||
});
|
||||
|
||||
test('acks get piggybacked with messages', async () => {
|
||||
const a = new PersistentProtocol(ether.a);
|
||||
const a = new PersistentProtocol(new NodeSocket(ether.a));
|
||||
const aMessages = new MessageStream(a);
|
||||
const b = new PersistentProtocol(ether.b);
|
||||
const b = new PersistentProtocol(new NodeSocket(ether.b));
|
||||
const bMessages = new MessageStream(b);
|
||||
|
||||
a.send(Buffer.from('a1'));
|
||||
a.send(VSBuffer.fromString('a1'));
|
||||
assert.equal(a.unacknowledgedCount, 1);
|
||||
assert.equal(b.unacknowledgedCount, 0);
|
||||
|
||||
a.send(Buffer.from('a2'));
|
||||
a.send(VSBuffer.fromString('a2'));
|
||||
assert.equal(a.unacknowledgedCount, 2);
|
||||
assert.equal(b.unacknowledgedCount, 0);
|
||||
|
||||
a.send(Buffer.from('a3'));
|
||||
a.send(VSBuffer.fromString('a3'));
|
||||
assert.equal(a.unacknowledgedCount, 3);
|
||||
assert.equal(b.unacknowledgedCount, 0);
|
||||
|
||||
@@ -198,7 +203,7 @@ suite('PersistentProtocol reconnection', () => {
|
||||
assert.equal(a.unacknowledgedCount, 3);
|
||||
assert.equal(b.unacknowledgedCount, 0);
|
||||
|
||||
b.send(Buffer.from('b1'));
|
||||
b.send(VSBuffer.fromString('b1'));
|
||||
assert.equal(a.unacknowledgedCount, 3);
|
||||
assert.equal(b.unacknowledgedCount, 1);
|
||||
|
||||
@@ -207,7 +212,7 @@ suite('PersistentProtocol reconnection', () => {
|
||||
assert.equal(a.unacknowledgedCount, 0);
|
||||
assert.equal(b.unacknowledgedCount, 1);
|
||||
|
||||
a.send(Buffer.from('a4'));
|
||||
a.send(VSBuffer.fromString('a4'));
|
||||
assert.equal(a.unacknowledgedCount, 1);
|
||||
assert.equal(b.unacknowledgedCount, 1);
|
||||
|
||||
|
||||
@@ -4,19 +4,19 @@
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import * as assert from 'assert';
|
||||
import { IChannel, IServerChannel } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { IMessagePassingProtocol, IPCServer, ClientConnectionEvent, IPCClient } from 'vs/base/parts/ipc/node/ipc';
|
||||
import { IChannel, IServerChannel, IMessagePassingProtocol, IPCServer, ClientConnectionEvent, IPCClient } from 'vs/base/parts/ipc/common/ipc';
|
||||
import { Emitter, Event } from 'vs/base/common/event';
|
||||
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
|
||||
import { canceled } from 'vs/base/common/errors';
|
||||
import { timeout } from 'vs/base/common/async';
|
||||
import { VSBuffer } from 'vs/base/common/buffer';
|
||||
|
||||
class QueueProtocol implements IMessagePassingProtocol {
|
||||
|
||||
private buffering = true;
|
||||
private buffers: Buffer[] = [];
|
||||
private buffers: VSBuffer[] = [];
|
||||
|
||||
private _onMessage = new Emitter<Buffer>({
|
||||
private _onMessage = new Emitter<VSBuffer>({
|
||||
onFirstListenerDidAdd: () => {
|
||||
for (const buffer of this.buffers) {
|
||||
this._onMessage.fire(buffer);
|
||||
@@ -33,11 +33,11 @@ class QueueProtocol implements IMessagePassingProtocol {
|
||||
readonly onMessage = this._onMessage.event;
|
||||
other: QueueProtocol;
|
||||
|
||||
send(buffer: Buffer): void {
|
||||
send(buffer: VSBuffer): void {
|
||||
this.other.receive(buffer);
|
||||
}
|
||||
|
||||
protected receive(buffer: Buffer): void {
|
||||
protected receive(buffer: VSBuffer): void {
|
||||
if (this.buffering) {
|
||||
this.buffers.push(buffer);
|
||||
} else {
|
||||
@@ -196,10 +196,10 @@ suite('Base IPC', function () {
|
||||
test('createProtocolPair', async function () {
|
||||
const [clientProtocol, serverProtocol] = createProtocolPair();
|
||||
|
||||
const b1 = Buffer.alloc(0);
|
||||
const b1 = VSBuffer.alloc(0);
|
||||
clientProtocol.send(b1);
|
||||
|
||||
const b3 = Buffer.alloc(0);
|
||||
const b3 = VSBuffer.alloc(0);
|
||||
serverProtocol.send(b3);
|
||||
|
||||
const b2 = await Event.toPromise(serverProtocol.onMessage);
|
||||
|
||||
Reference in New Issue
Block a user