Once, when implementing disk-based hash table in .NET I found I need a solution for performing fast random unbuffered asynchronous operations with multiple readers and a single writer thread.
Asynchronous operations are supported by System.IO.FileStream, however there are major flaws in this standard class:
AsyncJob is a class which allocates Overlapped structure and handles completion callback, also implements IAsyncResult.
It might have been not the best solution, but I decided to avoid using WaitHandle because it is an unmanaged resource and I suspect that launching thousands of simultaneous IO operations may exhaust resources. That is just a guess. So I came up with using Monitor class and a well-known construction lock-pulse (see method AsyncJob.completionCallback). I am thinking about redesigning this in future version, as IAsyncResult implementation is crippled: AsyncWaitHandle is always null so there is no way to use WaitHandle.WaitAll. However it is still possible to wait for completion of multiple async operations with System.Threading.CountdownEvent.
I intend to implement WriteFileGather and ReadFileScatted as they have to be much more efficient when reading/writing sequential data pages.
Please see the next post for some usage samples. Currently I am working on buffered overlapped IO support, which allows arbitrary buffer size and does not require buffer alignments, so that normal byte[] arrays can be passed to ReadFile/WriteFile. The class interface will be different.
Full source code can be found on https://overlappedstream.codeplex.com/
I hope information above would be useful to create proper overlapped IO file operations in .NET.
Asynchronous operations are supported by System.IO.FileStream, however there are major flaws in this standard class:
- FileStream does not support unbuffered operations. An MSDN article about file buffering says strict conditions should be met before we can use FILE_FLAG_NO_BUFFERING flag, like aligning memory buffers on physical sector boundary and number of bytes to read/write must be an integer multiple of the volume sector size. I've seen solutions like this one, however I could not prove that FileStream aligns it's buffer properly and cheating with FileOptions will always work.
- Asynchronous IO in FileStream is implemented in the way that it cannot be used efficiently in multithreaded environment, because BeginRead and BeginWrite operations do not have file position parameter, but instead user must set FileStream.Position before calling these functions. I think that is because .NET developers wanted to follow System.IO.Stream specification whilst this base class interface does not fit well into asynchronous random access concept. Anyway, we have to live with this and wrap the code into critical section, so it should look like:
lock (_lock) { stream.Position = position; stream.BeginWrite(buffer, offset, count, callback, state); }
which could be a performance killer in apps with heavy multithreaded IO. Besides, WinAPI functions such as WriteFile and ReadFile put no constraints on thread synchronisation in overlapped mode, that means we do not have to synchronize file access (except for some application-specific reasons).
public partial class OverlappedStream { private readonly SafeFileHandle _handle; public OverlappedStream( string path, FileMode mode, FileAccess access, FileShare share, FileOptions options, bool NoBuffering) { ... _handle = NativeMethods.CreateFileW( path, file_access, file_share, IntPtr.Zero, file_creation, file_flags, IntPtr.Zero); ASSERT(!_handle.IsInvalid); ThreadPool.BindHandle(_handle); } }Having implemented unbuffered IO, the data we read or write must be properly aligned and should not be moved by GC while operation is pending. Unfortunately, there is no way to align buffers with pure .NET, so we have to use unmanaged memory allocated with VirtualAlloc and freed with VirtualFree. Therefore Read/Write functions will accept IntPtr parameter instead of byte[] array.
/// <summary> /// Begins an asynchronous unbuffered read operation /// </summary> /// <param name="pBuffer">The unmanaged buffer to read the data into. /// Buffer should be aligned in memory to device sector size boundary. /// Buffer should not be deallocated or moved until operation is completed.</param> /// <param name="fileOffset">File pointer</param> /// <param name="numberOfBytesToRead">The maximum number of bytes to read.</param> /// <param name="callback">An optional asynchronous callback, to be called when the /// read is complete.</param> /// <param name="state">A user-provided object that distinguishes this particular /// asynchronous read request from other requests.</param> /// <returns>An IAsyncResult that represents the asynchronous read, which could still be /// pending.</returns> public IAsyncResult BeginRead(IntPtr pBuffer, UInt64 fileOffset, UInt32 numberOfBytesToRead, AsyncCallback callback, object state) { CheckNotDisposed(); AsyncJob job = null; try { job = new AsyncJob(callback, state, fileOffset, null); UInt32 numberOfBytesRead; bool result; unsafe { result = NativeMethods.ReadFile(_handle, pBuffer, numberOfBytesToRead, out numberOfBytesRead, job.OverlappedPtr); } if (result) job.CompleteSynchronously(); else CheckErrorPending(); AsyncJob ret = job; job = null; return ret; } finally { if (job != null) job.Dispose(); } } /// <summary> /// Waits for the pending asynchronous operation to complete and frees resources. /// </summary> /// <param name="ar">The reference to the pending asynchronous request to wait for.</param> /// <param name="throwOnError">When true, method throws <see cref="OverlappedStreamException"/> /// if any error detected.</param> /// <returns>Number of bytes transferred</returns> public UInt32 EndRead(IAsyncResult ar, bool throwOnError) { CheckNotDisposed(); AsyncJob job = ar as AsyncJob; if (job == null) throw new ArgumentException("Invalid argument", "asyncResult"); job.WaitForCompletion(); if (throwOnError) ASSERT(job.ErrorCode == 0, unchecked((int)job.ErrorCode)); return job.NumberOfBytesTransferred; } internal static void CheckErrorPending() { int error = Marshal.GetLastWin32Error(); if (error != NativeMethods.ERROR_IO_PENDING) { ASSERT(false, error); } }The documented behaviour of ReadFile/WriteFile in overlapped mode is that Windows will put completion packet in the queue in case of both synchronous and asynchronous execution, therefore AsyncJob which contains Overlapped structure is freed in BeginRead/BeginWrite only in case of error.
AsyncJob is a class which allocates Overlapped structure and handles completion callback, also implements IAsyncResult.
It might have been not the best solution, but I decided to avoid using WaitHandle because it is an unmanaged resource and I suspect that launching thousands of simultaneous IO operations may exhaust resources. That is just a guess. So I came up with using Monitor class and a well-known construction lock-pulse (see method AsyncJob.completionCallback). I am thinking about redesigning this in future version, as IAsyncResult implementation is crippled: AsyncWaitHandle is always null so there is no way to use WaitHandle.WaitAll. However it is still possible to wait for completion of multiple async operations with System.Threading.CountdownEvent.
/// <summary> /// Internal class, wraps Overlapped structure, completion port callback and IAsyncResult /// </summary> sealed class AsyncJob : IAsyncResult, IDisposable { #region privates private readonly AsyncCallback _callback; private readonly object _state; private readonly object _eventHandle = new object(); private bool _completedSynchronously = false; private bool _completed = false; private uint _numberOfBytesTransferred = 0; private uint _errorCode = 0; private readonly unsafe NativeOverlapped* _nativeOverlapped; #endregion /// <summary> /// Create instance, automatically allocates NativeOverlapped structure /// </summary> /// <param name="callback">User specified callback</param> /// <param name="state">User specified state</param> /// <param name="fileOffset">Start position</param> /// <param name="userData">An object or array of objects representing the input or output buffer /// for the operation. Buffer is pinned until object is disposed.</param> public AsyncJob(AsyncCallback callback, object state, UInt64 fileOffset, object userData) { _callback = callback; _state = state; Overlapped ov = new Overlapped(unchecked((int)(fileOffset & 0xFFFFFFFF)), unchecked((int)((fileOffset >> 32) & 0xFFFFFFFF)), IntPtr.Zero, this); unsafe { _nativeOverlapped = ov.UnsafePack(completionCallback, userData); } } #region IDisposable bool _disposed = false; public void Dispose() { if (_disposed) return; unsafe { Overlapped.Unpack(_nativeOverlapped); Overlapped.Free(_nativeOverlapped); } _disposed = true; GC.SuppressFinalize(this); } #endregion #region data accessors public unsafe NativeOverlapped* OverlappedPtr { get { return _nativeOverlapped; } } public uint NumberOfBytesTransferred { get { return _numberOfBytesTransferred; } } #endregion public void CompleteSynchronously() { _completedSynchronously = true; } public void WaitForCompletion() { lock (_eventHandle) { while (!_completed) Monitor.Wait(_eventHandle); } } public uint ErrorCode { get { return _errorCode; } } #region IAsyncResult Members public object AsyncState { get { return _state; } } public WaitHandle AsyncWaitHandle { get { return null; } } public bool CompletedSynchronously { get { return _completedSynchronously; } } public bool IsCompleted { get { return _completed; } } #endregion #region privates private unsafe void completionCallback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP) { try { if (errorCode != 0) { _log.ErrorFormat("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); } lock (_eventHandle) { System.Diagnostics.Debug.Assert(!_completed); _errorCode = errorCode; _numberOfBytesTransferred = numBytes; _completed = true; if (_callback != null) _callback.Invoke(this); Monitor.Pulse(_eventHandle); } } catch (Exception ex) { _log.Error("OverlappedStream.completionCallback error", ex); } finally { this.Dispose(); } } #endregion privates }Things TODO
I intend to implement WriteFileGather and ReadFileScatted as they have to be much more efficient when reading/writing sequential data pages.
Please see the next post for some usage samples. Currently I am working on buffered overlapped IO support, which allows arbitrary buffer size and does not require buffer alignments, so that normal byte[] arrays can be passed to ReadFile/WriteFile. The class interface will be different.
Full source code can be found on https://overlappedstream.codeplex.com/
I hope information above would be useful to create proper overlapped IO file operations in .NET.
Wow, great stuff! Thank you for sharing this. A few questions:
ReplyDelete1. I noticed in the code that it's not possible to use SequentialAccess with OverlappedStreamUnbuffered. I'm assuming there's a good reason for that. Can you point me in the right direction for some documentation that explains that?
2. About the waithandle, I suspect that you're right that it would take a lot of resources to implements it; I wonder if it could be initialized lazily so that the hit is only taken if calling code asks for it. I think Jeffrey Richter does something like this in his AsyncEnumerator dll.
3. I'd like to use this code code to make a utility similar to Wes Brown's UBCopy which you linked to above, but with true async. He decided not to go the overlapped route because he didn't want the unsafe code. I know you said you implemented this library to do random-access async I/O, but I wonder, have you done any experimenting with it to see how quickly you can get it to copy a really big file? If so, would you mind posting some sample code? ESEUTIL is good at copying big files without running out of memory, but I want to find something even faster if I can.
4. With the page-aligned requirement, how do you handle the end of the file if the total file size isn't a multiple of 4k?
Thanks, Matt
Hi Matt,
DeleteSequentialAccess is a .NET equivalent to FILE_FLAG_SEQUENTIAL_SCAN, which gives hint to OS to use read ahead more agressively, so this flag is related to file cache management.
Unbuffered IO (FILE_FLAG_NO_BUFFERING) disables system caching for data reads and writes, so SequentialAccess and RandomAccess flags are meaningless in this mode.
Here is more information about this matter: http://msdn.microsoft.com/en-us/library/windows/desktop/aa363858(v=vs.85).aspx#caching_behavior
In the case when file size isn't a multiple of 4k data writes are anyway performed in 4k chunks and an application can move the file pointer only to sector-aligned positions.
So after writes you will need to close file, open it again for write access in buffered mode and set file length with SetEndOfFile function.
I think that unbuffered IO should work perfectly for copying large files, and you might consider using ReadScatter/WriteGather operations to make it even faster.
Hope this helps.
Cheers.