Feature: Writing Execute Results to Temp File (#35)

* WIP for buffering in temporary file

* Adding support for writing to disk for buffering

* WIP - Adding file reader, factory for reader/writer

* Making long list use generics and implement IEnumerable

* Reading/Writing from file is working

* Removing unused 'skipValue' logic

* More tweaks to file buffer

Adding logic for cleaning up the temp files
Adding fix for empty/null column names

* Adding comments and cleanup

* Unit tests for FileStreamWrapper

* WIP adding more unit tests, and finishing up wiring up the output writers

* Finishing up initial unit tests

* Fixing bugs with long fields

* Squashed commit of the following:

commit df0ffc12a46cb286d801d08689964eac08ad71dd
Author: Benjamin Russell <beruss@microsoft.com>
Date:   Wed Sep 7 14:45:39 2016 -0700

    Removing last bit of async for file writing.

    We're seeing a 8x improvement of file write speeds!

commit 08a4b9f32e825512ca24d5dc03ef5acbf7cc6d94
Author: Benjamin Russell <beruss@microsoft.com>
Date:   Wed Sep 7 11:23:06 2016 -0700

    Removing async wrappers

* Rolling back test code for Program.cs

* Changes as per code review

* Fixing broken unit tests

* More fixes for codereview
This commit is contained in:
Benjamin Russell
2016-09-08 17:55:11 -07:00
committed by GitHub
parent 903eab61d1
commit 8aa3d524fc
24 changed files with 4050 additions and 195 deletions

View File

@@ -0,0 +1,50 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Represents a value returned from a read from a file stream. This is used to eliminate ref
/// parameters used in the read methods.
/// </summary>
/// <typeparam name="T">The type of the value that was read</typeparam>
public struct FileStreamReadResult<T>
{
/// <summary>
/// Whether or not the value of the field is null
/// </summary>
public bool IsNull { get; set; }
/// <summary>
/// The value of the field. If <see cref="IsNull"/> is true, this will be set to <c>default(T)</c>
/// </summary>
public T Value { get; set; }
/// <summary>
/// The total length in bytes of the value, (including the bytes used to store the length
/// of the value)
/// </summary>
/// <remarks>
/// Cell values are stored such that the length of the value is stored first, then the
/// value itself is stored. Eg, a string may be stored as 0x03 0x6C 0x6F 0x6C. Under this
/// system, the value would be "lol", the length would be 3, and the total length would be
/// 4 bytes.
/// </remarks>
public int TotalLength { get; set; }
/// <summary>
/// Constructs a new FileStreamReadResult
/// </summary>
/// <param name="value">The value of the result</param>
/// <param name="totalLength">The number of bytes for the used to store the value's length and value</param>
/// <param name="isNull">Whether or not the value is <c>null</c></param>
public FileStreamReadResult(T value, int totalLength, bool isNull)
{
Value = value;
TotalLength = totalLength;
IsNull = isNull;
}
}
}

View File

@@ -0,0 +1,282 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Diagnostics;
using System.IO;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Wrapper for a file stream, providing simplified creation, deletion, read, and write
/// functionality.
/// </summary>
public class FileStreamWrapper : IFileStreamWrapper
{
#region Member Variables
private byte[] buffer;
private int bufferDataSize;
private FileStream fileStream;
private long startOffset;
private long currentOffset;
#endregion
/// <summary>
/// Constructs a new FileStreamWrapper and initializes its state.
/// </summary>
public FileStreamWrapper()
{
// Initialize the internal state
bufferDataSize = 0;
startOffset = 0;
currentOffset = 0;
}
#region IFileStreamWrapper Implementation
/// <summary>
/// Initializes the wrapper by creating the internal buffer and opening the requested file.
/// If the file does not already exist, it will be created.
/// </summary>
/// <param name="fileName">Name of the file to open/create</param>
/// <param name="bufferLength">The length of the internal buffer</param>
/// <param name="accessMethod">
/// Whether or not the wrapper will be used for reading. If <c>true</c>, any calls to a
/// method that writes will cause an InvalidOperationException
/// </param>
public void Init(string fileName, int bufferLength, FileAccess accessMethod)
{
// Sanity check for valid buffer length, fileName, and accessMethod
if (bufferLength <= 0)
{
throw new ArgumentOutOfRangeException(nameof(bufferLength), "Buffer length must be a positive value");
}
if (string.IsNullOrWhiteSpace(fileName))
{
throw new ArgumentNullException(nameof(fileName), "File name cannot be null or whitespace");
}
if (accessMethod == FileAccess.Write)
{
throw new ArgumentException("Access method cannot be write-only", nameof(fileName));
}
// Setup the buffer
buffer = new byte[bufferLength];
// Open the requested file for reading/writing, creating one if it doesn't exist
fileStream = new FileStream(fileName, FileMode.OpenOrCreate, accessMethod, FileShare.ReadWrite,
bufferLength, false /*don't use asyncio*/);
// make file hidden
FileInfo fileInfo = new FileInfo(fileName);
fileInfo.Attributes |= FileAttributes.Hidden;
}
/// <summary>
/// Reads data into a buffer from the current offset into the file
/// </summary>
/// <param name="buf">The buffer to output the read data to</param>
/// <param name="bytes">The number of bytes to read into the buffer</param>
/// <returns>The number of bytes read</returns>
public int ReadData(byte[] buf, int bytes)
{
return ReadData(buf, bytes, currentOffset);
}
/// <summary>
/// Reads data into a buffer from the specified offset into the file
/// </summary>
/// <param name="buf">The buffer to output the read data to</param>
/// <param name="bytes">The number of bytes to read into the buffer</param>
/// <param name="offset">The offset into the file to start reading bytes from</param>
/// <returns>The number of bytes read</returns>
public int ReadData(byte[] buf, int bytes, long offset)
{
// Make sure that we're initialized before performing operations
if (buffer == null)
{
throw new InvalidOperationException("FileStreamWrapper must be initialized before performing operations");
}
MoveTo(offset);
int bytesCopied = 0;
while (bytesCopied < bytes)
{
int bufferOffset, bytesToCopy;
GetByteCounts(bytes, bytesCopied, out bufferOffset, out bytesToCopy);
Buffer.BlockCopy(buffer, bufferOffset, buf, bytesCopied, bytesToCopy);
bytesCopied += bytesToCopy;
if (bytesCopied < bytes && // did not get all the bytes yet
bufferDataSize == buffer.Length) // since current data buffer is full we should continue reading the file
{
// move forward one full length of the buffer
MoveTo(startOffset + buffer.Length);
}
else
{
// copied all the bytes requested or possible, adjust the current buffer pointer
currentOffset += bytesToCopy;
break;
}
}
return bytesCopied;
}
/// <summary>
/// Writes data to the underlying filestream, with buffering.
/// </summary>
/// <param name="buf">The buffer of bytes to write to the filestream</param>
/// <param name="bytes">The number of bytes to write</param>
/// <returns>The number of bytes written</returns>
public int WriteData(byte[] buf, int bytes)
{
// Make sure that we're initialized before performing operations
if (buffer == null)
{
throw new InvalidOperationException("FileStreamWrapper must be initialized before performing operations");
}
if (!fileStream.CanWrite)
{
throw new InvalidOperationException("This FileStreamWrapper canot be used for writing");
}
int bytesCopied = 0;
while (bytesCopied < bytes)
{
int bufferOffset, bytesToCopy;
GetByteCounts(bytes, bytesCopied, out bufferOffset, out bytesToCopy);
Buffer.BlockCopy(buf, bytesCopied, buffer, bufferOffset, bytesToCopy);
bytesCopied += bytesToCopy;
// adjust the current buffer pointer
currentOffset += bytesToCopy;
if (bytesCopied < bytes) // did not get all the bytes yet
{
Debug.Assert((int)(currentOffset - startOffset) == buffer.Length);
// flush buffer
Flush();
}
}
Debug.Assert(bytesCopied == bytes);
return bytesCopied;
}
/// <summary>
/// Flushes the internal buffer to the filestream
/// </summary>
public void Flush()
{
// Make sure that we're initialized before performing operations
if (buffer == null)
{
throw new InvalidOperationException("FileStreamWrapper must be initialized before performing operations");
}
if (!fileStream.CanWrite)
{
throw new InvalidOperationException("This FileStreamWrapper cannot be used for writing");
}
// Make sure we are at the right place in the file
Debug.Assert(fileStream.Position == startOffset);
int bytesToWrite = (int)(currentOffset - startOffset);
fileStream.Write(buffer, 0, bytesToWrite);
startOffset += bytesToWrite;
fileStream.Flush();
Debug.Assert(startOffset == currentOffset);
}
/// <summary>
/// Deletes the given file (ideally, created with this wrapper) from the filesystem
/// </summary>
/// <param name="fileName">The path to the file to delete</param>
public static void DeleteFile(string fileName)
{
File.Delete(fileName);
}
#endregion
/// <summary>
/// Perform calculations to determine how many bytes to copy and what the new buffer offset
/// will be for copying.
/// </summary>
/// <param name="bytes">Number of bytes requested to copy</param>
/// <param name="bytesCopied">Number of bytes copied so far</param>
/// <param name="bufferOffset">New offset to start copying from/to</param>
/// <param name="bytesToCopy">Number of bytes to copy in this iteration</param>
private void GetByteCounts(int bytes, int bytesCopied, out int bufferOffset, out int bytesToCopy)
{
bufferOffset = (int) (currentOffset - startOffset);
bytesToCopy = bytes - bytesCopied;
if (bytesToCopy > buffer.Length - bufferOffset)
{
bytesToCopy = buffer.Length - bufferOffset;
}
}
/// <summary>
/// Moves the internal buffer to the specified offset into the file
/// </summary>
/// <param name="offset">Offset into the file to move to</param>
private void MoveTo(long offset)
{
if (buffer.Length > bufferDataSize || // buffer is not completely filled
offset < startOffset || // before current buffer start
offset >= (startOffset + buffer.Length)) // beyond current buffer end
{
// init the offset
startOffset = offset;
// position file pointer
fileStream.Seek(startOffset, SeekOrigin.Begin);
// fill in the buffer
bufferDataSize = fileStream.Read(buffer, 0, buffer.Length);
}
// make sure to record where we are
currentOffset = offset;
}
#region IDisposable Implementation
private bool disposed;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposed)
{
return;
}
if (disposing && fileStream != null)
{
if(fileStream.CanWrite) { Flush(); }
fileStream.Dispose();
}
disposed = true;
}
~FileStreamWrapper()
{
Dispose(false);
}
#endregion
}
}

View File

@@ -0,0 +1,22 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Interface for a factory that creates filesystem readers/writers
/// </summary>
public interface IFileStreamFactory
{
string CreateFile();
IFileStreamReader GetReader(string fileName);
IFileStreamWriter GetWriter(string fileName, int maxCharsToStore, int maxXmlCharsToStore);
void DisposeFile(string fileName);
}
}

View File

@@ -0,0 +1,35 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Collections.Generic;
using System.Data.SqlTypes;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Interface for a object that reads from the filesystem
/// </summary>
public interface IFileStreamReader : IDisposable
{
object[] ReadRow(long offset, IEnumerable<DbColumnWrapper> columns);
FileStreamReadResult<short> ReadInt16(long i64Offset);
FileStreamReadResult<int> ReadInt32(long i64Offset);
FileStreamReadResult<long> ReadInt64(long i64Offset);
FileStreamReadResult<byte> ReadByte(long i64Offset);
FileStreamReadResult<char> ReadChar(long i64Offset);
FileStreamReadResult<bool> ReadBoolean(long i64Offset);
FileStreamReadResult<float> ReadSingle(long i64Offset);
FileStreamReadResult<double> ReadDouble(long i64Offset);
FileStreamReadResult<SqlDecimal> ReadSqlDecimal(long i64Offset);
FileStreamReadResult<decimal> ReadDecimal(long i64Offset);
FileStreamReadResult<DateTime> ReadDateTime(long i64Offset);
FileStreamReadResult<TimeSpan> ReadTimeSpan(long i64Offset);
FileStreamReadResult<string> ReadString(long i64Offset);
FileStreamReadResult<byte[]> ReadBytes(long i64Offset);
FileStreamReadResult<DateTimeOffset> ReadDateTimeOffset(long i64Offset);
}
}

View File

@@ -0,0 +1,22 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.IO;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Interface for a wrapper around a filesystem reader/writer, mainly for unit testing purposes
/// </summary>
public interface IFileStreamWrapper : IDisposable
{
void Init(string fileName, int bufferSize, FileAccess fileAccessMode);
int ReadData(byte[] buffer, int bytes);
int ReadData(byte[] buffer, int bytes, long fileOffset);
int WriteData(byte[] buffer, int bytes);
void Flush();
}
}

View File

@@ -0,0 +1,35 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Data.SqlTypes;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Interface for a object that writes to a filesystem wrapper
/// </summary>
public interface IFileStreamWriter : IDisposable
{
int WriteRow(StorageDataReader dataReader);
int WriteNull();
int WriteInt16(short val);
int WriteInt32(int val);
int WriteInt64(long val);
int WriteByte(byte val);
int WriteChar(char val);
int WriteBoolean(bool val);
int WriteSingle(float val);
int WriteDouble(double val);
int WriteDecimal(decimal val);
int WriteSqlDecimal(SqlDecimal val);
int WriteDateTime(DateTime val);
int WriteDateTimeOffset(DateTimeOffset dtoVal);
int WriteTimeSpan(TimeSpan val);
int WriteString(string val);
int WriteBytes(byte[] bytes, int length);
void FlushBuffer();
}
}

View File

@@ -0,0 +1,64 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System.IO;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Factory that creates file reader/writers that process rows in an internal, non-human readable file format
/// </summary>
public class ServiceBufferFileStreamFactory : IFileStreamFactory
{
/// <summary>
/// Creates a new temporary file
/// </summary>
/// <returns>The name of the temporary file</returns>
public string CreateFile()
{
return Path.GetTempFileName();
}
/// <summary>
/// Creates a new <see cref="ServiceBufferFileStreamReader"/> for reading values back from
/// an SSMS formatted buffer file
/// </summary>
/// <param name="fileName">The file to read values from</param>
/// <returns>A <see cref="ServiceBufferFileStreamReader"/></returns>
public IFileStreamReader GetReader(string fileName)
{
return new ServiceBufferFileStreamReader(new FileStreamWrapper(), fileName);
}
/// <summary>
/// Creates a new <see cref="ServiceBufferFileStreamWriter"/> for writing values out to an
/// SSMS formatted buffer file
/// </summary>
/// <param name="fileName">The file to write values to</param>
/// <param name="maxCharsToStore">The maximum number of characters to store from long text fields</param>
/// <param name="maxXmlCharsToStore">The maximum number of characters to store from xml fields</param>
/// <returns>A <see cref="ServiceBufferFileStreamWriter"/></returns>
public IFileStreamWriter GetWriter(string fileName, int maxCharsToStore, int maxXmlCharsToStore)
{
return new ServiceBufferFileStreamWriter(new FileStreamWrapper(), fileName, maxCharsToStore, maxXmlCharsToStore);
}
/// <summary>
/// Disposes of a file created via this factory
/// </summary>
/// <param name="fileName">The file to dispose of</param>
public void DisposeFile(string fileName)
{
try
{
FileStreamWrapper.DeleteFile(fileName);
}
catch
{
// If we have problems deleting the file from a temp location, we don't really care
}
}
}
}

View File

@@ -0,0 +1,889 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Collections.Generic;
using System.Data.SqlTypes;
using System.Diagnostics;
using System.IO;
using System.Text;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Reader for SSMS formatted file streams
/// </summary>
public class ServiceBufferFileStreamReader : IFileStreamReader
{
// Most of this code is based on code from the Microsoft.SqlServer.Management.UI.Grid, SSMS DataStorage
// $\Data Tools\SSMS_XPlat\sql\ssms\core\DataStorage\src\FileStreamReader.cs
private const int DefaultBufferSize = 8192;
#region Member Variables
private byte[] buffer;
private readonly IFileStreamWrapper fileStream;
#endregion
/// <summary>
/// Constructs a new ServiceBufferFileStreamReader and initializes its state
/// </summary>
/// <param name="fileWrapper">The filestream wrapper to read from</param>
/// <param name="fileName">The name of the file to read from</param>
public ServiceBufferFileStreamReader(IFileStreamWrapper fileWrapper, string fileName)
{
// Open file for reading/writing
fileStream = fileWrapper;
fileStream.Init(fileName, DefaultBufferSize, FileAccess.Read);
// Create internal buffer
buffer = new byte[DefaultBufferSize];
}
#region IFileStreamStorage Implementation
/// <summary>
/// Reads a row from the file, based on the columns provided
/// </summary>
/// <param name="fileOffset">Offset into the file where the row starts</param>
/// <param name="columns">The columns that were encoded</param>
/// <returns>The objects from the row</returns>
public object[] ReadRow(long fileOffset, IEnumerable<DbColumnWrapper> columns)
{
// Initialize for the loop
long currentFileOffset = fileOffset;
List<object> results = new List<object>();
// Iterate over the columns
foreach (DbColumnWrapper column in columns)
{
// We will pivot based on the type of the column
Type colType;
if (column.IsSqlVariant)
{
// For SQL Variant columns, the type is written first in string format
FileStreamReadResult<string> sqlVariantTypeResult = ReadString(currentFileOffset);
currentFileOffset += sqlVariantTypeResult.TotalLength;
// If the typename is null, then the whole value is null
if (sqlVariantTypeResult.IsNull)
{
results.Add(null);
continue;
}
// The typename is stored in the string
colType = Type.GetType(sqlVariantTypeResult.Value);
// Workaround .NET bug, see sqlbu# 440643 and vswhidbey# 599834
// TODO: Is this workaround necessary for .NET Core?
if (colType == null && sqlVariantTypeResult.Value == "System.Data.SqlTypes.SqlSingle")
{
colType = typeof(SqlSingle);
}
}
else
{
colType = column.DataType;
}
if (colType == typeof(string))
{
// String - most frequently used data type
FileStreamReadResult<string> result = ReadString(currentFileOffset);
currentFileOffset += result.TotalLength;
results.Add(result.IsNull ? null : result.Value);
}
else if (colType == typeof(SqlString))
{
// SqlString
FileStreamReadResult<string> result = ReadString(currentFileOffset);
currentFileOffset += result.TotalLength;
results.Add(result.IsNull ? null : (SqlString) result.Value);
}
else if (colType == typeof(short))
{
// Int16
FileStreamReadResult<short> result = ReadInt16(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlInt16))
{
// SqlInt16
FileStreamReadResult<short> result = ReadInt16(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add((SqlInt16)result.Value);
}
}
else if (colType == typeof(int))
{
// Int32
FileStreamReadResult<int> result = ReadInt32(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlInt32))
{
// SqlInt32
FileStreamReadResult<int> result = ReadInt32(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add((SqlInt32)result.Value);
}
}
else if (colType == typeof(long))
{
// Int64
FileStreamReadResult<long> result = ReadInt64(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlInt64))
{
// SqlInt64
FileStreamReadResult<long> result = ReadInt64(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add((SqlInt64)result.Value);
}
}
else if (colType == typeof(byte))
{
// byte
FileStreamReadResult<byte> result = ReadByte(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlByte))
{
// SqlByte
FileStreamReadResult<byte> result = ReadByte(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add((SqlByte)result.Value);
}
}
else if (colType == typeof(char))
{
// Char
FileStreamReadResult<char> result = ReadChar(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(bool))
{
// Bool
FileStreamReadResult<bool> result = ReadBoolean(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlBoolean))
{
// SqlBoolean
FileStreamReadResult<bool> result = ReadBoolean(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add((SqlBoolean)result.Value);
}
}
else if (colType == typeof(double))
{
// double
FileStreamReadResult<double> result = ReadDouble(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlDouble))
{
// SqlByte
FileStreamReadResult<double> result = ReadDouble(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add((SqlDouble)result.Value);
}
}
else if (colType == typeof(float))
{
// float
FileStreamReadResult<float> result = ReadSingle(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlSingle))
{
// SqlSingle
FileStreamReadResult<float> result = ReadSingle(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add((SqlSingle)result.Value);
}
}
else if (colType == typeof(decimal))
{
// Decimal
FileStreamReadResult<decimal> result = ReadDecimal(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlDecimal))
{
// SqlDecimal
FileStreamReadResult<SqlDecimal> result = ReadSqlDecimal(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(DateTime))
{
// DateTime
FileStreamReadResult<DateTime> result = ReadDateTime(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlDateTime))
{
// SqlDateTime
FileStreamReadResult<DateTime> result = ReadDateTime(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add((SqlDateTime)result.Value);
}
}
else if (colType == typeof(DateTimeOffset))
{
// DateTimeOffset
FileStreamReadResult<DateTimeOffset> result = ReadDateTimeOffset(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(TimeSpan))
{
// TimeSpan
FileStreamReadResult<TimeSpan> result = ReadTimeSpan(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(byte[]))
{
// Byte Array
FileStreamReadResult<byte[]> result = ReadBytes(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull || (column.IsUdt && result.Value.Length == 0))
{
results.Add(null);
}
else
{
results.Add(result.Value);
}
}
else if (colType == typeof(SqlBytes))
{
// SqlBytes
FileStreamReadResult<byte[]> result = ReadBytes(currentFileOffset);
currentFileOffset += result.TotalLength;
results.Add(result.IsNull ? null : new SqlBytes(result.Value));
}
else if (colType == typeof(SqlBinary))
{
// SqlBinary
FileStreamReadResult<byte[]> result = ReadBytes(currentFileOffset);
currentFileOffset += result.TotalLength;
results.Add(result.IsNull ? null : new SqlBinary(result.Value));
}
else if (colType == typeof(SqlGuid))
{
// SqlGuid
FileStreamReadResult<byte[]> result = ReadBytes(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(new SqlGuid(result.Value));
}
}
else if (colType == typeof(SqlMoney))
{
// SqlMoney
FileStreamReadResult<decimal> result = ReadDecimal(currentFileOffset);
currentFileOffset += result.TotalLength;
if (result.IsNull)
{
results.Add(null);
}
else
{
results.Add(new SqlMoney(result.Value));
}
}
else
{
// Treat everything else as a string
FileStreamReadResult<string> result = ReadString(currentFileOffset);
currentFileOffset += result.TotalLength;
results.Add(result.IsNull ? null : result.Value);
}
}
return results.ToArray();
}
/// <summary>
/// Reads a short from the file at the offset provided
/// </summary>
/// <param name="fileOffset">Offset into the file to read the short from</param>
/// <returns>A short</returns>
public FileStreamReadResult<short> ReadInt16(long fileOffset)
{
LengthResult length = ReadLength(fileOffset);
Debug.Assert(length.ValueLength == 0 || length.ValueLength == 2, "Invalid data length");
bool isNull = length.ValueLength == 0;
short val = default(short);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
val = BitConverter.ToInt16(buffer, 0);
}
return new FileStreamReadResult<short>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a int from the file at the offset provided
/// </summary>
/// <param name="fileOffset">Offset into the file to read the int from</param>
/// <returns>An int</returns>
public FileStreamReadResult<int> ReadInt32(long fileOffset)
{
LengthResult length = ReadLength(fileOffset);
Debug.Assert(length.ValueLength == 0 || length.ValueLength == 4, "Invalid data length");
bool isNull = length.ValueLength == 0;
int val = default(int);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
val = BitConverter.ToInt32(buffer, 0);
}
return new FileStreamReadResult<int>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a long from the file at the offset provided
/// </summary>
/// <param name="fileOffset">Offset into the file to read the long from</param>
/// <returns>A long</returns>
public FileStreamReadResult<long> ReadInt64(long fileOffset)
{
LengthResult length = ReadLength(fileOffset);
Debug.Assert(length.ValueLength == 0 || length.ValueLength == 8, "Invalid data length");
bool isNull = length.ValueLength == 0;
long val = default(long);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
val = BitConverter.ToInt64(buffer, 0);
}
return new FileStreamReadResult<long>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a byte from the file at the offset provided
/// </summary>
/// <param name="fileOffset">Offset into the file to read the byte from</param>
/// <returns>A byte</returns>
public FileStreamReadResult<byte> ReadByte(long fileOffset)
{
LengthResult length = ReadLength(fileOffset);
Debug.Assert(length.ValueLength == 0 || length.ValueLength == 1, "Invalid data length");
bool isNull = length.ValueLength == 0;
byte val = default(byte);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
val = buffer[0];
}
return new FileStreamReadResult<byte>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a char from the file at the offset provided
/// </summary>
/// <param name="fileOffset">Offset into the file to read the char from</param>
/// <returns>A char</returns>
public FileStreamReadResult<char> ReadChar(long fileOffset)
{
LengthResult length = ReadLength(fileOffset);
Debug.Assert(length.ValueLength == 0 || length.ValueLength == 2, "Invalid data length");
bool isNull = length.ValueLength == 0;
char val = default(char);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
val = BitConverter.ToChar(buffer, 0);
}
return new FileStreamReadResult<char>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a bool from the file at the offset provided
/// </summary>
/// <param name="fileOffset">Offset into the file to read the bool from</param>
/// <returns>A bool</returns>
public FileStreamReadResult<bool> ReadBoolean(long fileOffset)
{
LengthResult length = ReadLength(fileOffset);
Debug.Assert(length.ValueLength == 0 || length.ValueLength == 1, "Invalid data length");
bool isNull = length.ValueLength == 0;
bool val = default(bool);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
val = buffer[0] == 0x01;
}
return new FileStreamReadResult<bool>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a single from the file at the offset provided
/// </summary>
/// <param name="fileOffset">Offset into the file to read the single from</param>
/// <returns>A single</returns>
public FileStreamReadResult<float> ReadSingle(long fileOffset)
{
LengthResult length = ReadLength(fileOffset);
Debug.Assert(length.ValueLength == 0 || length.ValueLength == 4, "Invalid data length");
bool isNull = length.ValueLength == 0;
float val = default(float);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
val = BitConverter.ToSingle(buffer, 0);
}
return new FileStreamReadResult<float>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a double from the file at the offset provided
/// </summary>
/// <param name="fileOffset">Offset into the file to read the double from</param>
/// <returns>A double</returns>
public FileStreamReadResult<double> ReadDouble(long fileOffset)
{
LengthResult length = ReadLength(fileOffset);
Debug.Assert(length.ValueLength == 0 || length.ValueLength == 8, "Invalid data length");
bool isNull = length.ValueLength == 0;
double val = default(double);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
val = BitConverter.ToDouble(buffer, 0);
}
return new FileStreamReadResult<double>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a SqlDecimal from the file at the offset provided
/// </summary>
/// <param name="offset">Offset into the file to read the SqlDecimal from</param>
/// <returns>A SqlDecimal</returns>
public FileStreamReadResult<SqlDecimal> ReadSqlDecimal(long offset)
{
LengthResult length = ReadLength(offset);
Debug.Assert(length.ValueLength == 0 || (length.ValueLength - 3)%4 == 0,
string.Format("Invalid data length: {0}", length.ValueLength));
bool isNull = length.ValueLength == 0;
SqlDecimal val = default(SqlDecimal);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
int[] arrInt32 = new int[(length.ValueLength - 3)/4];
Buffer.BlockCopy(buffer, 3, arrInt32, 0, length.ValueLength - 3);
val = new SqlDecimal(buffer[0], buffer[1], 1 == buffer[2], arrInt32);
}
return new FileStreamReadResult<SqlDecimal>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a decimal from the file at the offset provided
/// </summary>
/// <param name="offset">Offset into the file to read the decimal from</param>
/// <returns>A decimal</returns>
public FileStreamReadResult<decimal> ReadDecimal(long offset)
{
LengthResult length = ReadLength(offset);
Debug.Assert(length.ValueLength%4 == 0, "Invalid data length");
bool isNull = length.ValueLength == 0;
decimal val = default(decimal);
if (!isNull)
{
fileStream.ReadData(buffer, length.ValueLength);
int[] arrInt32 = new int[length.ValueLength/4];
Buffer.BlockCopy(buffer, 0, arrInt32, 0, length.ValueLength);
val = new decimal(arrInt32);
}
return new FileStreamReadResult<decimal>(val, length.TotalLength, isNull);
}
/// <summary>
/// Reads a DateTime from the file at the offset provided
/// </summary>
/// <param name="offset">Offset into the file to read the DateTime from</param>
/// <returns>A DateTime</returns>
public FileStreamReadResult<DateTime> ReadDateTime(long offset)
{
FileStreamReadResult<long> ticks = ReadInt64(offset);
DateTime val = default(DateTime);
if (!ticks.IsNull)
{
val = new DateTime(ticks.Value);
}
return new FileStreamReadResult<DateTime>(val, ticks.TotalLength, ticks.IsNull);
}
/// <summary>
/// Reads a DateTimeOffset from the file at the offset provided
/// </summary>
/// <param name="offset">Offset into the file to read the DateTimeOffset from</param>
/// <returns>A DateTimeOffset</returns>
public FileStreamReadResult<DateTimeOffset> ReadDateTimeOffset(long offset)
{
// DateTimeOffset is represented by DateTime.Ticks followed by TimeSpan.Ticks
// both as Int64 values
// read the DateTime ticks
DateTimeOffset val = default(DateTimeOffset);
FileStreamReadResult<long> dateTimeTicks = ReadInt64(offset);
int totalLength = dateTimeTicks.TotalLength;
if (dateTimeTicks.TotalLength > 0 && !dateTimeTicks.IsNull)
{
// read the TimeSpan ticks
FileStreamReadResult<long> timeSpanTicks = ReadInt64(offset + dateTimeTicks.TotalLength);
Debug.Assert(!timeSpanTicks.IsNull, "TimeSpan ticks cannot be null if DateTime ticks are not null!");
totalLength += timeSpanTicks.TotalLength;
// build the DateTimeOffset
val = new DateTimeOffset(new DateTime(dateTimeTicks.Value), new TimeSpan(timeSpanTicks.Value));
}
return new FileStreamReadResult<DateTimeOffset>(val, totalLength, dateTimeTicks.IsNull);
}
/// <summary>
/// Reads a TimeSpan from the file at the offset provided
/// </summary>
/// <param name="offset">Offset into the file to read the TimeSpan from</param>
/// <returns>A TimeSpan</returns>
public FileStreamReadResult<TimeSpan> ReadTimeSpan(long offset)
{
FileStreamReadResult<long> timeSpanTicks = ReadInt64(offset);
TimeSpan val = default(TimeSpan);
if (!timeSpanTicks.IsNull)
{
val = new TimeSpan(timeSpanTicks.Value);
}
return new FileStreamReadResult<TimeSpan>(val, timeSpanTicks.TotalLength, timeSpanTicks.IsNull);
}
/// <summary>
/// Reads a string from the file at the offset provided
/// </summary>
/// <param name="offset">Offset into the file to read the string from</param>
/// <returns>A string</returns>
public FileStreamReadResult<string> ReadString(long offset)
{
LengthResult fieldLength = ReadLength(offset);
Debug.Assert(fieldLength.ValueLength%2 == 0, "Invalid data length");
if (fieldLength.ValueLength == 0) // there is no data
{
// If the total length is 5 (5 bytes for length, 0 for value), then the string is empty
// Otherwise, the string is null
bool isNull = fieldLength.TotalLength != 5;
return new FileStreamReadResult<string>(isNull ? null : string.Empty,
fieldLength.TotalLength, isNull);
}
// positive length
AssureBufferLength(fieldLength.ValueLength);
fileStream.ReadData(buffer, fieldLength.ValueLength);
return new FileStreamReadResult<string>(Encoding.Unicode.GetString(buffer, 0, fieldLength.ValueLength), fieldLength.TotalLength, false);
}
/// <summary>
/// Reads bytes from the file at the offset provided
/// </summary>
/// <param name="offset">Offset into the file to read the bytes from</param>
/// <returns>A byte array</returns>
public FileStreamReadResult<byte[]> ReadBytes(long offset)
{
LengthResult fieldLength = ReadLength(offset);
if (fieldLength.ValueLength == 0)
{
// If the total length is 5 (5 bytes for length, 0 for value), then the byte array is 0x
// Otherwise, the byte array is null
bool isNull = fieldLength.TotalLength != 5;
return new FileStreamReadResult<byte[]>(isNull ? null : new byte[0],
fieldLength.TotalLength, isNull);
}
// positive length
byte[] val = new byte[fieldLength.ValueLength];
fileStream.ReadData(val, fieldLength.ValueLength);
return new FileStreamReadResult<byte[]>(val, fieldLength.TotalLength, false);
}
/// <summary>
/// Reads the length of a field at the specified offset in the file
/// </summary>
/// <param name="offset">Offset into the file to read the field length from</param>
/// <returns>A LengthResult</returns>
internal LengthResult ReadLength(long offset)
{
// read in length information
int lengthValue;
int lengthLength = fileStream.ReadData(buffer, 1, offset);
if (buffer[0] != 0xFF)
{
// one byte is enough
lengthValue = Convert.ToInt32(buffer[0]);
}
else
{
// read in next 4 bytes
lengthLength += fileStream.ReadData(buffer, 4);
// reconstruct the length
lengthValue = BitConverter.ToInt32(buffer, 0);
}
return new LengthResult {LengthLength = lengthLength, ValueLength = lengthValue};
}
#endregion
/// <summary>
/// Internal struct used for representing the length of a field from the file
/// </summary>
internal struct LengthResult
{
/// <summary>
/// How many bytes the length takes up
/// </summary>
public int LengthLength { get; set; }
/// <summary>
/// How many bytes the value takes up
/// </summary>
public int ValueLength { get; set; }
/// <summary>
/// <see cref="LengthLength"/> + <see cref="ValueLength"/>
/// </summary>
public int TotalLength
{
get { return LengthLength + ValueLength; }
}
}
/// <summary>
/// Creates a new buffer that is of the specified length if the buffer is not already
/// at least as long as specified.
/// </summary>
/// <param name="newBufferLength">The minimum buffer size</param>
private void AssureBufferLength(int newBufferLength)
{
if (buffer.Length < newBufferLength)
{
buffer = new byte[newBufferLength];
}
}
#region IDisposable Implementation
private bool disposed;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposed)
{
return;
}
if (disposing)
{
fileStream.Dispose();
}
disposed = true;
}
~ServiceBufferFileStreamReader()
{
Dispose(false);
}
#endregion
}
}

View File

@@ -0,0 +1,749 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Data.SqlTypes;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Writer for SSMS formatted file streams
/// </summary>
public class ServiceBufferFileStreamWriter : IFileStreamWriter
{
// Most of this code is based on code from the Microsoft.SqlServer.Management.UI.Grid, SSMS DataStorage
// $\Data Tools\SSMS_XPlat\sql\ssms\core\DataStorage\src\FileStreamWriter.cs
#region Properties
public const int DefaultBufferLength = 8192;
private int MaxCharsToStore { get; set; }
private int MaxXmlCharsToStore { get; set; }
private IFileStreamWrapper FileStream { get; set; }
private byte[] byteBuffer;
private readonly short[] shortBuffer;
private readonly int[] intBuffer;
private readonly long[] longBuffer;
private readonly char[] charBuffer;
private readonly double[] doubleBuffer;
private readonly float[] floatBuffer;
#endregion
/// <summary>
/// Constructs a new writer
/// </summary>
/// <param name="fileWrapper">The file wrapper to use as the underlying file stream</param>
/// <param name="fileName">Name of the file to write to</param>
/// <param name="maxCharsToStore">Maximum number of characters to store for long text fields</param>
/// <param name="maxXmlCharsToStore">Maximum number of characters to store for XML fields</param>
public ServiceBufferFileStreamWriter(IFileStreamWrapper fileWrapper, string fileName, int maxCharsToStore, int maxXmlCharsToStore)
{
// open file for reading/writing
FileStream = fileWrapper;
FileStream.Init(fileName, DefaultBufferLength, FileAccess.ReadWrite);
// create internal buffer
byteBuffer = new byte[DefaultBufferLength];
// Create internal buffers for blockcopy of contents to byte array
// Note: We create them now to avoid the overhead of creating a new array for every write call
shortBuffer = new short[1];
intBuffer = new int[1];
longBuffer = new long[1];
charBuffer = new char[1];
doubleBuffer = new double[1];
floatBuffer = new float[1];
// Store max chars to store
MaxCharsToStore = maxCharsToStore;
MaxXmlCharsToStore = maxXmlCharsToStore;
}
#region IFileStreamWriter Implementation
/// <summary>
/// Writes an entire row to the file stream
/// </summary>
/// <param name="reader">A primed reader</param>
/// <returns>Number of bytes used to write the row</returns>
public int WriteRow(StorageDataReader reader)
{
// Determine if we have any long fields
bool hasLongFields = reader.Columns.Any(column => column.IsLong);
object[] values = new object[reader.Columns.Length];
int rowBytes = 0;
if (!hasLongFields)
{
// get all record values in one shot if there are no extra long fields
reader.GetValues(values);
}
// Loop over all the columns and write the values to the temp file
for (int i = 0; i < reader.Columns.Length; i++)
{
DbColumnWrapper ci = reader.Columns[i];
if (hasLongFields)
{
if (reader.IsDBNull(i))
{
// Need special case for DBNull because
// reader.GetValue doesn't return DBNull in case of SqlXml and CLR type
values[i] = DBNull.Value;
}
else
{
if (!ci.IsLong)
{
// not a long field
values[i] = reader.GetValue(i);
}
else
{
// this is a long field
if (ci.IsBytes)
{
values[i] = reader.GetBytesWithMaxCapacity(i, MaxCharsToStore);
}
else if (ci.IsChars)
{
Debug.Assert(MaxCharsToStore > 0);
values[i] = reader.GetCharsWithMaxCapacity(i,
ci.IsXml ? MaxXmlCharsToStore : MaxCharsToStore);
}
else if (ci.IsXml)
{
Debug.Assert(MaxXmlCharsToStore > 0);
values[i] = reader.GetXmlWithMaxCapacity(i, MaxXmlCharsToStore);
}
else
{
// we should never get here
Debug.Assert(false);
}
}
}
}
Type tVal = values[i].GetType(); // get true type of the object
if (tVal == typeof(DBNull))
{
rowBytes += WriteNull();
}
else
{
if (ci.IsSqlVariant)
{
// serialize type information as a string before the value
string val = tVal.ToString();
rowBytes += WriteString(val);
}
if (tVal == typeof(string))
{
// String - most frequently used data type
string val = (string)values[i];
rowBytes += WriteString(val);
}
else if (tVal == typeof(SqlString))
{
// SqlString
SqlString val = (SqlString)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteString(val.Value);
}
}
else if (tVal == typeof(short))
{
// Int16
short val = (short)values[i];
rowBytes += WriteInt16(val);
}
else if (tVal == typeof(SqlInt16))
{
// SqlInt16
SqlInt16 val = (SqlInt16)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteInt16(val.Value);
}
}
else if (tVal == typeof(int))
{
// Int32
int val = (int)values[i];
rowBytes += WriteInt32(val);
}
else if (tVal == typeof(SqlInt32))
{
// SqlInt32
SqlInt32 val = (SqlInt32)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteInt32(val.Value);
}
}
else if (tVal == typeof(long))
{
// Int64
long val = (long)values[i];
rowBytes += WriteInt64(val);
}
else if (tVal == typeof(SqlInt64))
{
// SqlInt64
SqlInt64 val = (SqlInt64)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteInt64(val.Value);
}
}
else if (tVal == typeof(byte))
{
// Byte
byte val = (byte)values[i];
rowBytes += WriteByte(val);
}
else if (tVal == typeof(SqlByte))
{
// SqlByte
SqlByte val = (SqlByte)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteByte(val.Value);
}
}
else if (tVal == typeof(char))
{
// Char
char val = (char)values[i];
rowBytes += WriteChar(val);
}
else if (tVal == typeof(bool))
{
// Boolean
bool val = (bool)values[i];
rowBytes += WriteBoolean(val);
}
else if (tVal == typeof(SqlBoolean))
{
// SqlBoolean
SqlBoolean val = (SqlBoolean)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteBoolean(val.Value);
}
}
else if (tVal == typeof(double))
{
// Double
double val = (double)values[i];
rowBytes += WriteDouble(val);
}
else if (tVal == typeof(SqlDouble))
{
// SqlDouble
SqlDouble val = (SqlDouble)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteDouble(val.Value);
}
}
else if (tVal == typeof(SqlSingle))
{
// SqlSingle
SqlSingle val = (SqlSingle)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteSingle(val.Value);
}
}
else if (tVal == typeof(decimal))
{
// Decimal
decimal val = (decimal)values[i];
rowBytes += WriteDecimal(val);
}
else if (tVal == typeof(SqlDecimal))
{
// SqlDecimal
SqlDecimal val = (SqlDecimal)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteSqlDecimal(val);
}
}
else if (tVal == typeof(DateTime))
{
// DateTime
DateTime val = (DateTime)values[i];
rowBytes += WriteDateTime(val);
}
else if (tVal == typeof(DateTimeOffset))
{
// DateTimeOffset
DateTimeOffset val = (DateTimeOffset)values[i];
rowBytes += WriteDateTimeOffset(val);
}
else if (tVal == typeof(SqlDateTime))
{
// SqlDateTime
SqlDateTime val = (SqlDateTime)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteDateTime(val.Value);
}
}
else if (tVal == typeof(TimeSpan))
{
// TimeSpan
TimeSpan val = (TimeSpan)values[i];
rowBytes += WriteTimeSpan(val);
}
else if (tVal == typeof(byte[]))
{
// Bytes
byte[] val = (byte[])values[i];
rowBytes += WriteBytes(val, val.Length);
}
else if (tVal == typeof(SqlBytes))
{
// SqlBytes
SqlBytes val = (SqlBytes)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteBytes(val.Value, val.Value.Length);
}
}
else if (tVal == typeof(SqlBinary))
{
// SqlBinary
SqlBinary val = (SqlBinary)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteBytes(val.Value, val.Value.Length);
}
}
else if (tVal == typeof(SqlGuid))
{
// SqlGuid
SqlGuid val = (SqlGuid)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
byte[] bytesVal = val.ToByteArray();
rowBytes += WriteBytes(bytesVal, bytesVal.Length);
}
}
else if (tVal == typeof(SqlMoney))
{
// SqlMoney
SqlMoney val = (SqlMoney)values[i];
if (val.IsNull)
{
rowBytes += WriteNull();
}
else
{
rowBytes += WriteDecimal(val.Value);
}
}
else
{
// treat everything else as string
string val = values[i].ToString();
rowBytes += WriteString(val);
}
}
}
// Flush the buffer after every row
FlushBuffer();
return rowBytes;
}
/// <summary>
/// Writes null to the file as one 0x00 byte
/// </summary>
/// <returns>Number of bytes used to store the null</returns>
public int WriteNull()
{
byteBuffer[0] = 0x00;
return FileStream.WriteData(byteBuffer, 1);
}
/// <summary>
/// Writes a short to the file
/// </summary>
/// <returns>Number of bytes used to store the short</returns>
public int WriteInt16(short val)
{
byteBuffer[0] = 0x02; // length
shortBuffer[0] = val;
Buffer.BlockCopy(shortBuffer, 0, byteBuffer, 1, 2);
return FileStream.WriteData(byteBuffer, 3);
}
/// <summary>
/// Writes a int to the file
/// </summary>
/// <returns>Number of bytes used to store the int</returns>
public int WriteInt32(int val)
{
byteBuffer[0] = 0x04; // length
intBuffer[0] = val;
Buffer.BlockCopy(intBuffer, 0, byteBuffer, 1, 4);
return FileStream.WriteData(byteBuffer, 5);
}
/// <summary>
/// Writes a long to the file
/// </summary>
/// <returns>Number of bytes used to store the long</returns>
public int WriteInt64(long val)
{
byteBuffer[0] = 0x08; // length
longBuffer[0] = val;
Buffer.BlockCopy(longBuffer, 0, byteBuffer, 1, 8);
return FileStream.WriteData(byteBuffer, 9);
}
/// <summary>
/// Writes a char to the file
/// </summary>
/// <returns>Number of bytes used to store the char</returns>
public int WriteChar(char val)
{
byteBuffer[0] = 0x02; // length
charBuffer[0] = val;
Buffer.BlockCopy(charBuffer, 0, byteBuffer, 1, 2);
return FileStream.WriteData(byteBuffer, 3);
}
/// <summary>
/// Writes a bool to the file
/// </summary>
/// <returns>Number of bytes used to store the bool</returns>
public int WriteBoolean(bool val)
{
byteBuffer[0] = 0x01; // length
byteBuffer[1] = (byte) (val ? 0x01 : 0x00);
return FileStream.WriteData(byteBuffer, 2);
}
/// <summary>
/// Writes a byte to the file
/// </summary>
/// <returns>Number of bytes used to store the byte</returns>
public int WriteByte(byte val)
{
byteBuffer[0] = 0x01; // length
byteBuffer[1] = val;
return FileStream.WriteData(byteBuffer, 2);
}
/// <summary>
/// Writes a float to the file
/// </summary>
/// <returns>Number of bytes used to store the float</returns>
public int WriteSingle(float val)
{
byteBuffer[0] = 0x04; // length
floatBuffer[0] = val;
Buffer.BlockCopy(floatBuffer, 0, byteBuffer, 1, 4);
return FileStream.WriteData(byteBuffer, 5);
}
/// <summary>
/// Writes a double to the file
/// </summary>
/// <returns>Number of bytes used to store the double</returns>
public int WriteDouble(double val)
{
byteBuffer[0] = 0x08; // length
doubleBuffer[0] = val;
Buffer.BlockCopy(doubleBuffer, 0, byteBuffer, 1, 8);
return FileStream.WriteData(byteBuffer, 9);
}
/// <summary>
/// Writes a SqlDecimal to the file
/// </summary>
/// <returns>Number of bytes used to store the SqlDecimal</returns>
public int WriteSqlDecimal(SqlDecimal val)
{
int[] arrInt32 = val.Data;
int iLen = 3 + (arrInt32.Length * 4);
int iTotalLen = WriteLength(iLen); // length
// precision
byteBuffer[0] = val.Precision;
// scale
byteBuffer[1] = val.Scale;
// positive
byteBuffer[2] = (byte)(val.IsPositive ? 0x01 : 0x00);
// data value
Buffer.BlockCopy(arrInt32, 0, byteBuffer, 3, iLen - 3);
iTotalLen += FileStream.WriteData(byteBuffer, iLen);
return iTotalLen; // len+data
}
/// <summary>
/// Writes a decimal to the file
/// </summary>
/// <returns>Number of bytes used to store the decimal</returns>
public int WriteDecimal(decimal val)
{
int[] arrInt32 = decimal.GetBits(val);
int iLen = arrInt32.Length * 4;
int iTotalLen = WriteLength(iLen); // length
Buffer.BlockCopy(arrInt32, 0, byteBuffer, 0, iLen);
iTotalLen += FileStream.WriteData(byteBuffer, iLen);
return iTotalLen; // len+data
}
/// <summary>
/// Writes a DateTime to the file
/// </summary>
/// <returns>Number of bytes used to store the DateTime</returns>
public int WriteDateTime(DateTime dtVal)
{
return WriteInt64(dtVal.Ticks);
}
/// <summary>
/// Writes a DateTimeOffset to the file
/// </summary>
/// <returns>Number of bytes used to store the DateTimeOffset</returns>
public int WriteDateTimeOffset(DateTimeOffset dtoVal)
{
// DateTimeOffset gets written as a DateTime + TimeOffset
// both represented as 'Ticks' written as Int64's
return WriteInt64(dtoVal.Ticks) + WriteInt64(dtoVal.Offset.Ticks);
}
/// <summary>
/// Writes a TimeSpan to the file
/// </summary>
/// <returns>Number of bytes used to store the TimeSpan</returns>
public int WriteTimeSpan(TimeSpan timeSpan)
{
return WriteInt64(timeSpan.Ticks);
}
/// <summary>
/// Writes a string to the file
/// </summary>
/// <returns>Number of bytes used to store the string</returns>
public int WriteString(string sVal)
{
if (sVal == null)
{
throw new ArgumentNullException(nameof(sVal), "String to store must be non-null.");
}
int iTotalLen;
if (0 == sVal.Length) // special case of 0 length string
{
const int iLen = 5;
AssureBufferLength(iLen);
byteBuffer[0] = 0xFF;
byteBuffer[1] = 0x00;
byteBuffer[2] = 0x00;
byteBuffer[3] = 0x00;
byteBuffer[4] = 0x00;
iTotalLen = FileStream.WriteData(byteBuffer, 5);
}
else
{
// Convert to a unicode byte array
byte[] bytes = Encoding.Unicode.GetBytes(sVal);
// convert char array into byte array and write it out
iTotalLen = WriteLength(bytes.Length);
iTotalLen += FileStream.WriteData(bytes, bytes.Length);
}
return iTotalLen; // len+data
}
/// <summary>
/// Writes a byte[] to the file
/// </summary>
/// <returns>Number of bytes used to store the byte[]</returns>
public int WriteBytes(byte[] bytesVal, int iLen)
{
if (bytesVal == null)
{
throw new ArgumentNullException(nameof(bytesVal), "Byte array to store must be non-null.");
}
int iTotalLen;
if (0 == iLen) // special case of 0 length byte array "0x"
{
iLen = 5;
AssureBufferLength(iLen);
byteBuffer[0] = 0xFF;
byteBuffer[1] = 0x00;
byteBuffer[2] = 0x00;
byteBuffer[3] = 0x00;
byteBuffer[4] = 0x00;
iTotalLen = FileStream.WriteData(byteBuffer, iLen);
}
else
{
iTotalLen = WriteLength(iLen);
iTotalLen += FileStream.WriteData(bytesVal, iLen);
}
return iTotalLen; // len+data
}
/// <summary>
/// Writes the length of the field using the appropriate number of bytes (ie, 1 if the
/// length is &lt;255, 5 if the length is &gt;=255)
/// </summary>
/// <returns>Number of bytes used to store the length</returns>
private int WriteLength(int iLen)
{
if (iLen < 0xFF)
{
// fits in one byte of memory only need to write one byte
int iTmp = iLen & 0x000000FF;
byteBuffer[0] = Convert.ToByte(iTmp);
return FileStream.WriteData(byteBuffer, 1);
}
// The length won't fit in 1 byte, so we need to use 1 byte to signify that the length
// is a full 4 bytes.
byteBuffer[0] = 0xFF;
// convert int32 into array of bytes
intBuffer[0] = iLen;
Buffer.BlockCopy(intBuffer, 0, byteBuffer, 1, 4);
return FileStream.WriteData(byteBuffer, 5);
}
/// <summary>
/// Flushes the internal buffer to the file stream
/// </summary>
public void FlushBuffer()
{
FileStream.Flush();
}
#endregion
private void AssureBufferLength(int newBufferLength)
{
if (newBufferLength > byteBuffer.Length)
{
byteBuffer = new byte[byteBuffer.Length];
}
}
#region IDisposable Implementation
private bool disposed;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposed)
{
return;
}
if (disposing)
{
FileStream.Flush();
FileStream.Dispose();
}
disposed = true;
}
~ServiceBufferFileStreamWriter()
{
Dispose(false);
}
#endregion
}
}

View File

@@ -0,0 +1,356 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Data.Common;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using Microsoft.SqlTools.EditorServices.Utility;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
namespace Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage
{
/// <summary>
/// Wrapper around a DbData reader to perform some special operations more simply
/// </summary>
public class StorageDataReader
{
// This code is based on code from Microsoft.SqlServer.Management.UI.Grid, SSMS DataStorage,
// StorageDataReader
// $\Data Tools\SSMS_XPlat\sql\ssms\core\DataStorage\src\StorageDataReader.cs
#region Member Variables
/// <summary>
/// If the DbDataReader is a SqlDataReader, it will be set here
/// </summary>
private readonly SqlDataReader sqlDataReader;
/// <summary>
/// Whether or not the data reader supports SqlXml types
/// </summary>
private readonly bool supportSqlXml;
#endregion
/// <summary>
/// Constructs a new wrapper around the provided reader
/// </summary>
/// <param name="reader">The reader to wrap around</param>
public StorageDataReader(DbDataReader reader)
{
// Sanity check to make sure there is a data reader
Validate.IsNotNull(nameof(reader), reader);
// Attempt to use this reader as a SqlDataReader
sqlDataReader = reader as SqlDataReader;
supportSqlXml = sqlDataReader != null;
DbDataReader = reader;
// Read the columns into a set of wrappers
Columns = DbDataReader.GetColumnSchema().Select(column => new DbColumnWrapper(column)).ToArray();
}
#region Properties
/// <summary>
/// All the columns that this reader currently contains
/// </summary>
public DbColumnWrapper[] Columns { get; private set; }
/// <summary>
/// The <see cref="DbDataReader"/> that will be read from
/// </summary>
public DbDataReader DbDataReader { get; private set; }
#endregion
#region DbDataReader Methods
/// <summary>
/// Pass-through to DbDataReader.ReadAsync()
/// </summary>
/// <param name="cancellationToken">The cancellation token to use for cancelling a query</param>
/// <returns></returns>
public Task<bool> ReadAsync(CancellationToken cancellationToken)
{
return DbDataReader.ReadAsync(cancellationToken);
}
/// <summary>
/// Retrieves a value
/// </summary>
/// <param name="i">Column ordinal</param>
/// <returns>The value of the given column</returns>
public object GetValue(int i)
{
return sqlDataReader == null ? DbDataReader.GetValue(i) : sqlDataReader.GetValue(i);
}
/// <summary>
/// Stores all values of the current row into the provided object array
/// </summary>
/// <param name="values">Where to store the values from this row</param>
public void GetValues(object[] values)
{
if (sqlDataReader == null)
{
DbDataReader.GetValues(values);
}
else
{
sqlDataReader.GetValues(values);
}
}
/// <summary>
/// Whether or not the cell of the given column at the current row is a DBNull
/// </summary>
/// <param name="i">Column ordinal</param>
/// <returns>True if the cell is DBNull, false otherwise</returns>
public bool IsDBNull(int i)
{
return DbDataReader.IsDBNull(i);
}
#endregion
#region Public Methods
/// <summary>
/// Retrieves bytes with a maximum number of bytes to return
/// </summary>
/// <param name="iCol">Column ordinal</param>
/// <param name="maxNumBytesToReturn">Number of bytes to return at maximum</param>
/// <returns>Byte array</returns>
public byte[] GetBytesWithMaxCapacity(int iCol, int maxNumBytesToReturn)
{
if (maxNumBytesToReturn <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxNumBytesToReturn), "Maximum number of bytes to return must be greater than zero.");
}
//first, ask provider how much data it has and calculate the final # of bytes
//NOTE: -1 means that it doesn't know how much data it has
long neededLength;
long origLength = neededLength = GetBytes(iCol, 0, null, 0, 0);
if (neededLength == -1 || neededLength > maxNumBytesToReturn)
{
neededLength = maxNumBytesToReturn;
}
//get the data up to the maxNumBytesToReturn
byte[] bytesBuffer = new byte[neededLength];
GetBytes(iCol, 0, bytesBuffer, 0, (int)neededLength);
//see if server sent back more data than we should return
if (origLength == -1 || origLength > neededLength)
{
//pump the rest of data from the reader and discard it right away
long dataIndex = neededLength;
const int tmpBufSize = 100000;
byte[] tmpBuf = new byte[tmpBufSize];
while (GetBytes(iCol, dataIndex, tmpBuf, 0, tmpBufSize) == tmpBufSize)
{
dataIndex += tmpBufSize;
}
}
return bytesBuffer;
}
/// <summary>
/// Retrieves characters with a maximum number of charss to return
/// </summary>
/// <param name="iCol">Column ordinal</param>
/// <param name="maxCharsToReturn">Number of chars to return at maximum</param>
/// <returns>String</returns>
public string GetCharsWithMaxCapacity(int iCol, int maxCharsToReturn)
{
if (maxCharsToReturn <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxCharsToReturn), "Maximum number of chars to return must be greater than zero");
}
//first, ask provider how much data it has and calculate the final # of chars
//NOTE: -1 means that it doesn't know how much data it has
long neededLength;
long origLength = neededLength = GetChars(iCol, 0, null, 0, 0);
if (neededLength == -1 || neededLength > maxCharsToReturn)
{
neededLength = maxCharsToReturn;
}
Debug.Assert(neededLength < int.MaxValue);
//get the data up to maxCharsToReturn
char[] buffer = new char[neededLength];
if (neededLength > 0)
{
GetChars(iCol, 0, buffer, 0, (int)neededLength);
}
//see if server sent back more data than we should return
if (origLength == -1 || origLength > neededLength)
{
//pump the rest of data from the reader and discard it right away
long dataIndex = neededLength;
const int tmpBufSize = 100000;
char[] tmpBuf = new char[tmpBufSize];
while (GetChars(iCol, dataIndex, tmpBuf, 0, tmpBufSize) == tmpBufSize)
{
dataIndex += tmpBufSize;
}
}
string res = new string(buffer);
return res;
}
/// <summary>
/// Retrieves xml with a maximum number of bytes to return
/// </summary>
/// <param name="iCol">Column ordinal</param>
/// <param name="maxCharsToReturn">Number of chars to return at maximum</param>
/// <returns>String</returns>
public string GetXmlWithMaxCapacity(int iCol, int maxCharsToReturn)
{
if (supportSqlXml)
{
SqlXml sm = GetSqlXml(iCol);
if (sm == null)
{
return null;
}
//this code is mostly copied from SqlClient implementation of returning value for XML data type
StringWriterWithMaxCapacity sw = new StringWriterWithMaxCapacity(null, maxCharsToReturn);
XmlWriterSettings writerSettings = new XmlWriterSettings
{
CloseOutput = false,
ConformanceLevel = ConformanceLevel.Fragment
};
// don't close the memory stream
XmlWriter ww = XmlWriter.Create(sw, writerSettings);
XmlReader reader = sm.CreateReader();
reader.Read();
while (!reader.EOF)
{
ww.WriteNode(reader, true);
}
ww.Flush();
return sw.ToString();
}
object o = GetValue(iCol);
return o?.ToString();
}
#endregion
#region Private Helpers
private long GetBytes(int i, long dataIndex, byte[] buffer, int bufferIndex, int length)
{
return DbDataReader.GetBytes(i, dataIndex, buffer, bufferIndex, length);
}
private long GetChars(int i, long dataIndex, char[] buffer, int bufferIndex, int length)
{
return DbDataReader.GetChars(i, dataIndex, buffer, bufferIndex, length);
}
private SqlXml GetSqlXml(int i)
{
if (sqlDataReader == null)
{
// We need a Sql data reader in order to retrieve sql xml
throw new InvalidOperationException("Cannot retrieve SqlXml without a SqlDataReader");
}
return sqlDataReader.GetSqlXml(i);
}
#endregion
/// <summary>
/// Internal class for writing strings with a maximum capacity
/// </summary>
/// <remarks>
/// This code is take almost verbatim from Microsoft.SqlServer.Management.UI.Grid, SSMS
/// DataStorage, StorageDataReader class.
/// </remarks>
private class StringWriterWithMaxCapacity : StringWriter
{
private bool stopWriting;
private int CurrentLength
{
get { return GetStringBuilder().Length; }
}
public StringWriterWithMaxCapacity(IFormatProvider formatProvider, int capacity) : base(formatProvider)
{
MaximumCapacity = capacity;
}
private int MaximumCapacity { get; set; }
public override void Write(char value)
{
if (stopWriting) { return; }
if (CurrentLength < MaximumCapacity)
{
base.Write(value);
}
else
{
stopWriting = true;
}
}
public override void Write(char[] buffer, int index, int count)
{
if (stopWriting) { return; }
int curLen = CurrentLength;
if (curLen + (count - index) > MaximumCapacity)
{
stopWriting = true;
count = MaximumCapacity - curLen + index;
if (count < 0)
{
count = 0;
}
}
base.Write(buffer, index, count);
}
public override void Write(string value)
{
if (stopWriting) { return; }
int curLen = CurrentLength;
if (value.Length + curLen > MaximumCapacity)
{
stopWriting = true;
base.Write(value.Substring(0, MaximumCapacity - curLen));
}
else
{
base.Write(value);
}
}
}
}
}