Once, when implementing disk-based hash table in .NET I found I need a solution for performing fast random unbuffered asynchronous operations with multiple readers and a single writer thread.
Asynchronous operations are supported by System.IO.FileStream, however there are major flaws in this standard class:
- FileStream does not support unbuffered operations. An MSDN article about file buffering says strict conditions should be met before we can use FILE_FLAG_NO_BUFFERING flag, like aligning memory buffers on physical sector boundary and number of bytes to read/write must be an integer multiple of the volume sector size. I've seen solutions like this one, however I could not prove that FileStream aligns it's buffer properly and cheating with FileOptions will always work.
- Asynchronous IO in FileStream is implemented in the way that it cannot be used efficiently in multithreaded environment, because BeginRead and BeginWrite operations do not have file position parameter, but instead user must set FileStream.Position before calling these functions. I think that is because .NET developers wanted to follow System.IO.Stream specification whilst this base class interface does not fit well into asynchronous random access concept. Anyway, we have to live with this and wrap the code into critical section, so it should look like:
lock (_lock)
{
stream.Position = position;
stream.BeginWrite(buffer, offset, count, callback, state);
}
which could be a performance killer in apps with heavy multithreaded IO. Besides, WinAPI functions such as WriteFile and ReadFile put no constraints on thread synchronisation in overlapped mode, that means we do not have to synchronize file access (except for some application-specific reasons).
So I've made an attempt to create a custom class. Firstly, I implemented everything with plain P/Invoke calls to WinAPI file functions and CreateIoCompletionPort and GetQueuedCompletionStatus, however ended up with necessity to maintain a thread pool to handle completion callbacks. But stop, .NET has own IO thread pool and there should be no need to create another one. Here comes ThreadPool.BindHandle - is a .NET way to associate an instance of an opened file handle with an I/O completion port (calls CreateIoCompletionPort internally) which is bound to the IO threadpool:
public partial class OverlappedStream
{
private readonly SafeFileHandle _handle;
public OverlappedStream(
string path,
FileMode mode,
FileAccess access,
FileShare share,
FileOptions options,
bool NoBuffering)
{
...
_handle = NativeMethods.CreateFileW(
path, file_access, file_share, IntPtr.Zero, file_creation, file_flags, IntPtr.Zero);
ASSERT(!_handle.IsInvalid);
ThreadPool.BindHandle(_handle);
}
}
Having implemented unbuffered IO, the data we read or write must be properly aligned and should not be moved by GC while operation is pending. Unfortunately, there is no way to align buffers with pure .NET, so we have to use unmanaged memory allocated with VirtualAlloc and freed with VirtualFree. Therefore Read/Write functions will accept IntPtr parameter instead of byte[] array.
/// <summary>
/// Begins an asynchronous unbuffered read operation
/// </summary>
/// <param name="pBuffer">The unmanaged buffer to read the data into.
/// Buffer should be aligned in memory to device sector size boundary.
/// Buffer should not be deallocated or moved until operation is completed.</param>
/// <param name="fileOffset">File pointer</param>
/// <param name="numberOfBytesToRead">The maximum number of bytes to read.</param>
/// <param name="callback">An optional asynchronous callback, to be called when the
/// read is complete.</param>
/// <param name="state">A user-provided object that distinguishes this particular
/// asynchronous read request from other requests.</param>
/// <returns>An IAsyncResult that represents the asynchronous read, which could still be
/// pending.</returns>
public IAsyncResult BeginRead(IntPtr pBuffer, UInt64 fileOffset, UInt32 numberOfBytesToRead,
AsyncCallback callback, object state)
{
CheckNotDisposed();
AsyncJob job = null;
try
{
job = new AsyncJob(callback, state, fileOffset, null);
UInt32 numberOfBytesRead; bool result;
unsafe
{
result = NativeMethods.ReadFile(_handle, pBuffer, numberOfBytesToRead,
out numberOfBytesRead, job.OverlappedPtr);
}
if (result)
job.CompleteSynchronously();
else
CheckErrorPending();
AsyncJob ret = job; job = null;
return ret;
}
finally
{
if (job != null)
job.Dispose();
}
}
/// <summary>
/// Waits for the pending asynchronous operation to complete and frees resources.
/// </summary>
/// <param name="ar">The reference to the pending asynchronous request to wait for.</param>
/// <param name="throwOnError">When true, method throws <see cref="OverlappedStreamException"/>
/// if any error detected.</param>
/// <returns>Number of bytes transferred</returns>
public UInt32 EndRead(IAsyncResult ar, bool throwOnError)
{
CheckNotDisposed();
AsyncJob job = ar as AsyncJob;
if (job == null) throw new ArgumentException("Invalid argument", "asyncResult");
job.WaitForCompletion();
if (throwOnError)
ASSERT(job.ErrorCode == 0, unchecked((int)job.ErrorCode));
return job.NumberOfBytesTransferred;
}
internal static void CheckErrorPending()
{
int error = Marshal.GetLastWin32Error();
if (error != NativeMethods.ERROR_IO_PENDING)
{
ASSERT(false, error);
}
}
The documented behaviour of ReadFile/WriteFile in overlapped mode is that Windows will put completion packet in the queue in case of both synchronous and asynchronous execution, therefore AsyncJob which contains Overlapped structure is freed in BeginRead/BeginWrite only in case of error.
AsyncJob is a class which allocates Overlapped structure and handles completion callback, also implements IAsyncResult.
It might have been not the best solution, but I decided to avoid using WaitHandle because it is an unmanaged resource and I suspect that launching thousands of simultaneous IO operations may exhaust resources. That is just a guess. So I came up with using Monitor class and a well-known construction lock-pulse (see method AsyncJob.completionCallback). I am thinking about redesigning this in future version, as IAsyncResult implementation is crippled: AsyncWaitHandle is always null so there is no way to use WaitHandle.WaitAll. However it is still possible to wait for completion of multiple async operations with
System.Threading.CountdownEvent.
/// <summary>
/// Internal class, wraps Overlapped structure, completion port callback and IAsyncResult
/// </summary>
sealed class AsyncJob : IAsyncResult, IDisposable
{
#region privates
private readonly AsyncCallback _callback;
private readonly object _state;
private readonly object _eventHandle = new object();
private bool _completedSynchronously = false;
private bool _completed = false;
private uint _numberOfBytesTransferred = 0;
private uint _errorCode = 0;
private readonly unsafe NativeOverlapped* _nativeOverlapped;
#endregion
/// <summary>
/// Create instance, automatically allocates NativeOverlapped structure
/// </summary>
/// <param name="callback">User specified callback</param>
/// <param name="state">User specified state</param>
/// <param name="fileOffset">Start position</param>
/// <param name="userData">An object or array of objects representing the input or output buffer
/// for the operation. Buffer is pinned until object is disposed.</param>
public AsyncJob(AsyncCallback callback, object state, UInt64 fileOffset, object userData)
{
_callback = callback;
_state = state;
Overlapped ov = new Overlapped(unchecked((int)(fileOffset & 0xFFFFFFFF)),
unchecked((int)((fileOffset >> 32) & 0xFFFFFFFF)), IntPtr.Zero, this);
unsafe { _nativeOverlapped = ov.UnsafePack(completionCallback, userData); }
}
#region IDisposable
bool _disposed = false;
public void Dispose()
{
if (_disposed) return;
unsafe
{
Overlapped.Unpack(_nativeOverlapped);
Overlapped.Free(_nativeOverlapped);
}
_disposed = true;
GC.SuppressFinalize(this);
}
#endregion
#region data accessors
public unsafe NativeOverlapped* OverlappedPtr
{
get { return _nativeOverlapped; }
}
public uint NumberOfBytesTransferred
{
get { return _numberOfBytesTransferred; }
}
#endregion
public void CompleteSynchronously()
{
_completedSynchronously = true;
}
public void WaitForCompletion()
{
lock (_eventHandle)
{
while (!_completed)
Monitor.Wait(_eventHandle);
}
}
public uint ErrorCode { get { return _errorCode; } }
#region IAsyncResult Members
public object AsyncState
{
get { return _state; }
}
public WaitHandle AsyncWaitHandle
{
get { return null; }
}
public bool CompletedSynchronously
{
get { return _completedSynchronously; }
}
public bool IsCompleted
{
get { return _completed; }
}
#endregion
#region privates
private unsafe void completionCallback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP)
{
try
{
if (errorCode != 0)
{
_log.ErrorFormat("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
}
lock (_eventHandle)
{
System.Diagnostics.Debug.Assert(!_completed);
_errorCode = errorCode;
_numberOfBytesTransferred = numBytes;
_completed = true;
if (_callback != null)
_callback.Invoke(this);
Monitor.Pulse(_eventHandle);
}
}
catch (Exception ex)
{
_log.Error("OverlappedStream.completionCallback error", ex);
}
finally
{
this.Dispose();
}
}
#endregion privates
}
Things TODO
I intend to implement WriteFileGather and ReadFileScatted as they have to be much more efficient when reading/writing sequential data pages.
Please see the next post for some usage samples. Currently I am working on buffered overlapped IO support, which allows arbitrary buffer size and does not require buffer alignments, so that normal byte[] arrays can be passed to ReadFile/WriteFile. The class interface will be different.
Full source code can be found on
https://overlappedstream.codeplex.com/
I hope information above would be useful to create proper overlapped IO file operations in .NET.