Tuesday, July 12, 2011

MacOS and WCF struggle

I spent several unforgettable days connecting MacOS client to Windows WCF service. The choice was to use gSOAP, the best open source library I could find (version 2.8.3).

Server had basicHttpBinding over SSL with TransportWithMessageCredential security mode and clientCredentialType="UserName".
The process of generating proxy with gSOAP had to be quite simple. Firstly, create header file with service description with wsdl2h tool; then, generate proxies with soapcpp2 and include them into the build.

Here is the log:
Firstly, I installed ZLIB and OpenSSL libraries in project dir.
Then, gSOAP tool wsdl2h could not work over HTTPS, and required a special build. I was not able to compile it as described in documentation, as 'make secure' simply ignored WITH_OPENSSL flag. I had to modify one line in gsoap-2.8/gsoap/wsdl/Makefile.am:
wsdl2h_CPPFLAGS=$(AM_CPPFLAGS) $(SOAPCPP2_NONAMESPACES) -D$(platform) $(WSDL2H_EXTRA_FLAGS)
so that now wsdl2h is built with SSL support by main make.
Alternatively, it is possible to compile wsdl2h with:
make -f MakefileManual CMFLAGS=-DWITH_OPENSSL LIBS="-lcrypto -lssl"

Now, next important step is to prepare typemap.dat file, which contains namespace definitions, custom classes/structs and headers which will be included later into autogenerated files. As we use message level authentication, the following lines should be added at the end:
[
#import "wsse.h" 
]
where "wsse.h" (can be found in shared/gsoap/import) contains the definition of SOAP_ENV__Header structure as follows:
struct SOAP_ENV__Header {
    mustUnderstand _wsse__Security *wsse__Security;
}
Without these alterations it is not possible to use wsse plugin and set message authentication header in SOAP envelope.
Then, we generate wsdl description:
./bin/wsdl2h -t typemap.dat -x -o testwcf1.h https://zoo.office.local/Service1.svc?wsdl
and create proxies:
./bin/soapcpp2 -j -wx -1 -C -I ./:./share/gsoap:./share/gsoap/import:./share/gsoap/plugin testwcf1.h
I made a project in XCode, so added last step as a script build phase.
In addition, I had to add wsseapi.cpp, mecevp.c and smdevp.c files from ./share/gsoap/plugin to the link, and stdsoap2.cpp and dom.cpp from gSOAP source folder.
Define following pre-processor macros: WITH_OPENSSL, WITH_GZIP, WITH_DOM.

Sample client code:
soap_ssl_init();
soap soap(SOAP_IO_DEFAULT | SOAP_IO_KEEPALIVE);
    
soap_wsse_add_UsernameTokenText(&soap, "Id", "login", "password");

if(soap_ssl_client_context(&soap, SOAP_SSL_DEFAULT, NULL, NULL, "./ca.pem", NULL, NULL))
{
    soap_print_fault(&soap, stderr);
    return 1;
}
    
BasicHttpBinding_USCOREIService1Proxy svc(&soap);
svc.soap_endpoint = "https://zoo.office.local/Service1.svc";
    
_test__GetData gd;
_test__GetDataResponse resp;
    
int input_value = 42;
gd.value = &input_value;
int result = svc.GetData(&gd, &resp);
if(SOAP_OK == result) {
    std::cout << "result: " << *(resp.GetDataResult) << "\n";
}
else {
    svc.soap_stream_fault(std::cout);
    return 1;
}

Wednesday, May 25, 2011

Unbuffered Overlapped IO in .NET (continued)

See the first article about overlapped IO.
As promised, I've created Buffered version of OverlappedStream. It has a byte[] buffer instead of unmanaged IntPtr, as there are no restrictions on memory alignment when FILE_FLAG_NO_BUFFERING is not used.
Also, in unbuffered version both WriteFileGather and ReadFileScatted functions were implemented. The restrictions for buffer size and alignment are different for Gather/Scatter operations: buffers should be of system page size and have the same alignment. OverlappedStreamUnbuffered.SystemPageSize property returns this value and it has to be used for buffer allocations.

Some examples


OverlappedStreamUnbuffered stream = new OverlappedStreamUnbuffered(
 path, 
 System.IO.FileMode.OpenOrCreate,
 System.IO.FileAccess.ReadWrite, 
 System.IO.FileShare.Read, 
 OverlappedStreamUnbuffered.UnbufferedFileOptions.None);

IntPtr[] buffers = new IntPtr[BUFFERS];
for (int i = 0; i < buffers.Length; i++)
{
 // allocate aligned buffers with VirtualAlloc or custom class
 buffers[i] = UnmanagedBufferPool.Alloc((int)OverlappedStreamUnbuffered.SystemPageSize);
}

IAsyncResult ar = stream.BeginReadScatter(buffers, startAddr, null, null);

uint bytesRead = OverlappedStream.EndOperation(ar, throwOnError);
// it is also possible to use more common syntax:
// stream.EndRead(ar);

Final notes

Asynchronous file access is often tricky, same as any multithreaded programming. One of non-obvious behaviours is that thread exit aborts pending IO operations started in this thread. It is well described here: http://www.beefycode.com/post/Overlapped-IO-Aborted-by-Terminating-Thread.aspx.
Therefore always wait for completion of all async operations even if they are initiated from a thread pool thread.

Full source code can be found on https://overlappedstream.codeplex.com/

Monday, May 2, 2011

Unbuffered Overlapped IO in .NET

Once, when implementing disk-based hash table in .NET I found I need a solution for performing fast random  unbuffered asynchronous operations with multiple readers and a single writer thread.
Asynchronous operations are supported by System.IO.FileStream, however there are major flaws in this standard class:

  1. FileStream does not support unbuffered operations. An MSDN article about file buffering says strict conditions should be met before we can use FILE_FLAG_NO_BUFFERING flag, like aligning memory buffers on physical sector boundary and number of bytes to read/write must be an integer multiple of the volume sector size. I've seen solutions like this one, however I could not prove that FileStream aligns it's buffer properly and cheating with FileOptions will always work.
  2. Asynchronous IO in FileStream is implemented in the way that it cannot be used efficiently in multithreaded environment, because BeginRead and BeginWrite operations do not have file position parameter, but instead user must set FileStream.Position before calling these functions. I think that is because .NET developers wanted to follow System.IO.Stream specification whilst this base class interface does not fit well into asynchronous random access concept. Anyway, we have to live with this and wrap the code into critical section, so it should look like:
    lock (_lock)
    {
     stream.Position = position;
     stream.BeginWrite(buffer, offset, count, callback, state);
    }
    
    which could be a performance killer in apps with heavy multithreaded IO. Besides, WinAPI functions such as WriteFile and ReadFile put no constraints on thread synchronisation in overlapped mode, that means we do not have to synchronize file access (except for some application-specific reasons).
So I've made an attempt to create a custom class. Firstly, I implemented everything with plain P/Invoke calls to WinAPI file functions and CreateIoCompletionPort and GetQueuedCompletionStatus, however ended up with necessity to maintain a thread pool to handle completion callbacks. But stop, .NET has own IO thread pool and there should be no need to create another one. Here comes ThreadPool.BindHandle - is a .NET way to associate an instance of an opened file handle with an I/O completion port (calls CreateIoCompletionPort internally) which is bound to the IO threadpool:
public partial class OverlappedStream
{
 private readonly SafeFileHandle _handle;

 public OverlappedStream(
  string path,
  FileMode mode,
  FileAccess access,
  FileShare share,
  FileOptions options,
  bool NoBuffering)
 {
  ...

  _handle = NativeMethods.CreateFileW(
   path, file_access, file_share, IntPtr.Zero, file_creation, file_flags, IntPtr.Zero);

  ASSERT(!_handle.IsInvalid);

  ThreadPool.BindHandle(_handle);
 }
}
Having implemented unbuffered IO, the data we read or write must be properly aligned and should not be moved by GC while operation is pending. Unfortunately, there is no way to align buffers with pure .NET, so we have to use unmanaged memory allocated with VirtualAlloc and freed with VirtualFree. Therefore Read/Write functions will accept IntPtr parameter instead of byte[] array.
/// <summary>
/// Begins an asynchronous unbuffered read operation
/// </summary>
/// <param name="pBuffer">The unmanaged buffer to read the data into. 
/// Buffer should be aligned in memory to device sector size boundary. 
/// Buffer should not be deallocated or moved until operation is completed.</param>
/// <param name="fileOffset">File pointer</param>
/// <param name="numberOfBytesToRead">The maximum number of bytes to read.</param>
/// <param name="callback">An optional asynchronous callback, to be called when the 
/// read is complete.</param>
/// <param name="state">A user-provided object that distinguishes this particular 
/// asynchronous read request from other requests.</param>
/// <returns>An IAsyncResult that represents the asynchronous read, which could still be 
/// pending.</returns>
public IAsyncResult BeginRead(IntPtr pBuffer, UInt64 fileOffset, UInt32 numberOfBytesToRead, 
                              AsyncCallback callback, object state)
{
 CheckNotDisposed();

 AsyncJob job = null;
 try
 {
  job = new AsyncJob(callback, state, fileOffset, null);
  UInt32 numberOfBytesRead; bool result;
  unsafe
  {
   result = NativeMethods.ReadFile(_handle, pBuffer, numberOfBytesToRead, 
                                   out numberOfBytesRead, job.OverlappedPtr);
  }

  if (result)
   job.CompleteSynchronously();
  else
   CheckErrorPending();

  AsyncJob ret = job; job = null;
  return ret;
 }
 finally
 {
  if (job != null)
   job.Dispose();
 }
}

/// <summary>
/// Waits for the pending asynchronous operation to complete and frees resources.
/// </summary>
/// <param name="ar">The reference to the pending asynchronous request to wait for.</param>
/// <param name="throwOnError">When true, method throws <see cref="OverlappedStreamException"/> 
/// if any error detected.</param>
/// <returns>Number of bytes transferred</returns>
public UInt32 EndRead(IAsyncResult ar, bool throwOnError)
{
 CheckNotDisposed();

 AsyncJob job = ar as AsyncJob;
 if (job == null) throw new ArgumentException("Invalid argument", "asyncResult");

 job.WaitForCompletion();
 if (throwOnError)
  ASSERT(job.ErrorCode == 0, unchecked((int)job.ErrorCode));
 return job.NumberOfBytesTransferred;
}

internal static void CheckErrorPending()
{
 int error = Marshal.GetLastWin32Error();
 if (error != NativeMethods.ERROR_IO_PENDING)
 {
  ASSERT(false, error);
 }
}
The documented behaviour of ReadFile/WriteFile in overlapped mode is that Windows will put completion packet in the queue in case of both synchronous and asynchronous execution, therefore AsyncJob which contains Overlapped structure is freed in BeginRead/BeginWrite only in case of error.

AsyncJob is a class which allocates Overlapped structure and handles completion callback, also implements IAsyncResult.
It might have been not the best solution, but I decided to avoid using WaitHandle because it is an unmanaged resource and I suspect that launching thousands of simultaneous IO operations may exhaust resources. That is just a guess. So I came up with using Monitor class and a well-known construction lock-pulse (see method AsyncJob.completionCallback). I am thinking about redesigning this in future version, as IAsyncResult implementation is crippled: AsyncWaitHandle is always null so there is no way to use WaitHandle.WaitAll. However it is still possible to wait for completion of multiple async operations with System.Threading.CountdownEvent.

/// <summary>
/// Internal class, wraps Overlapped structure, completion port callback and IAsyncResult
/// </summary>
sealed class AsyncJob : IAsyncResult, IDisposable
{
 #region privates

 private readonly AsyncCallback _callback;
 private readonly object _state;

 private readonly object _eventHandle = new object();
 private bool _completedSynchronously = false;
 private bool _completed = false;
 private uint _numberOfBytesTransferred = 0;
 private uint _errorCode = 0;
 private readonly unsafe NativeOverlapped* _nativeOverlapped;

 #endregion

 /// <summary>
 /// Create instance, automatically allocates NativeOverlapped structure
 /// </summary>
 /// <param name="callback">User specified callback</param>
 /// <param name="state">User specified state</param>
 /// <param name="fileOffset">Start position</param>
 /// <param name="userData">An object or array of objects representing the input or output buffer 
 /// for the operation. Buffer is pinned until object is disposed.</param>
 public AsyncJob(AsyncCallback callback, object state, UInt64 fileOffset, object userData)
 {
  _callback = callback;
  _state = state;
  Overlapped ov = new Overlapped(unchecked((int)(fileOffset & 0xFFFFFFFF)), 
                                 unchecked((int)((fileOffset >> 32) & 0xFFFFFFFF)), IntPtr.Zero, this);
  unsafe { _nativeOverlapped = ov.UnsafePack(completionCallback, userData); }
 }

 #region IDisposable

 bool _disposed = false;
 public void Dispose()
 {
  if (_disposed) return;

  unsafe
  {
   Overlapped.Unpack(_nativeOverlapped);
   Overlapped.Free(_nativeOverlapped);
  }

  _disposed = true;

  GC.SuppressFinalize(this);
 }

 #endregion

 #region data accessors

 public unsafe NativeOverlapped* OverlappedPtr
 {
  get { return _nativeOverlapped; }
 }
 public uint NumberOfBytesTransferred
 {
  get { return _numberOfBytesTransferred; }
 }

 #endregion

 public void CompleteSynchronously()
 {
  _completedSynchronously = true;
 }

 public void WaitForCompletion()
 {
  lock (_eventHandle)
  {
   while (!_completed)
    Monitor.Wait(_eventHandle);
  }
 }

 public uint ErrorCode { get { return _errorCode; } }

 #region IAsyncResult Members

 public object AsyncState
 {
  get { return _state; }
 }

 public WaitHandle AsyncWaitHandle
 {
  get { return null; }
 }

 public bool CompletedSynchronously
 {
  get { return _completedSynchronously; }
 }

 public bool IsCompleted
 {
  get { return _completed; }
 }

 #endregion

 #region privates

 private unsafe void completionCallback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP)
 {
  try
  {
   if (errorCode != 0)
   {
    _log.ErrorFormat("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
   }

   lock (_eventHandle)
   {
    System.Diagnostics.Debug.Assert(!_completed);

    _errorCode = errorCode;
    _numberOfBytesTransferred = numBytes;
    _completed = true;

    if (_callback != null)
     _callback.Invoke(this);

    Monitor.Pulse(_eventHandle);
   }
  }
  catch (Exception ex)
  {
   _log.Error("OverlappedStream.completionCallback error", ex);
  }
  finally
  {
   this.Dispose();
  }
 }

 #endregion privates
}
Things TODO
I intend to implement WriteFileGather and ReadFileScatted as they have to be much more efficient when reading/writing sequential data pages.

Please see the next post for some usage samples. Currently I am working on buffered overlapped IO support, which allows arbitrary buffer size and does not require buffer alignments, so that normal byte[] arrays can be passed to ReadFile/WriteFile. The class interface will be different.

Full source code can be found on https://overlappedstream.codeplex.com/
I hope information above would be useful to create proper overlapped IO file operations in .NET.