jellyfin/Emby.Server.Implementations/IO/AsyncStreamCopier.cs
2017-05-25 09:00:14 -04:00

460 lines
16 KiB
C#

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Emby.Server.Implementations.IO
{
public class AsyncStreamCopier : IDisposable
{
// size in bytes of the buffers in the buffer pool
private const int DefaultBufferSize = 81920;
private readonly int _bufferSize;
// number of buffers in the pool
private const int DefaultBufferCount = 4;
private readonly int _bufferCount;
// indexes of the next buffer to read into/write from
private int _nextReadBuffer = -1;
private int _nextWriteBuffer = -1;
// the buffer pool, implemented as an array, and used in a cyclic way
private readonly byte[][] _buffers;
// sizes in bytes of the available (read) data in the buffers
private readonly int[] _sizes;
// the streams...
private Stream _source;
private Stream _target;
private readonly bool _closeStreamsOnEnd;
// number of buffers that are ready to be written
private int _buffersToWrite;
// flag indicating that there is still a read operation to be scheduled
// (source end of stream not reached)
private volatile bool _moreDataToRead;
// the result of the whole operation, returned by BeginCopy()
private AsyncResult _asyncResult;
// any exception that occurs during an async operation
// stored here for rethrow
private Exception _exception;
public TaskCompletionSource<long> TaskCompletionSource;
private long _bytesToRead;
private long _totalBytesWritten;
private CancellationToken _cancellationToken;
public int IndividualReadOffset = 0;
public AsyncStreamCopier(Stream source,
Stream target,
long bytesToRead,
CancellationToken cancellationToken,
bool closeStreamsOnEnd = false,
int bufferSize = DefaultBufferSize,
int bufferCount = DefaultBufferCount)
{
if (source == null)
throw new ArgumentNullException("source");
if (target == null)
throw new ArgumentNullException("target");
if (!source.CanRead)
throw new ArgumentException("Cannot copy from a non-readable stream.");
if (!target.CanWrite)
throw new ArgumentException("Cannot copy to a non-writable stream.");
_source = source;
_target = target;
_moreDataToRead = true;
_closeStreamsOnEnd = closeStreamsOnEnd;
_bufferSize = bufferSize;
_bufferCount = bufferCount;
_buffers = new byte[_bufferCount][];
_sizes = new int[_bufferCount];
_bytesToRead = bytesToRead;
_cancellationToken = cancellationToken;
}
~AsyncStreamCopier()
{
// ensure any exception cannot be ignored
ThrowExceptionIfNeeded();
}
public static Task<long> CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)
{
return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken);
}
public static Task<long> CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)
{
var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount);
var taskCompletion = new TaskCompletionSource<long>();
copier.TaskCompletionSource = taskCompletion;
var result = copier.BeginCopy(StreamCopyCallback, copier);
if (result.CompletedSynchronously)
{
StreamCopyCallback(result);
}
cancellationToken.Register(() => taskCompletion.TrySetCanceled());
return taskCompletion.Task;
}
private static void StreamCopyCallback(IAsyncResult result)
{
var copier = (AsyncStreamCopier)result.AsyncState;
var taskCompletion = copier.TaskCompletionSource;
try
{
copier.EndCopy(result);
taskCompletion.TrySetResult(copier._totalBytesWritten);
}
catch (Exception ex)
{
taskCompletion.TrySetException(ex);
}
}
public void Dispose()
{
if (_asyncResult != null)
_asyncResult.Dispose();
if (_closeStreamsOnEnd)
{
if (_source != null)
{
_source.Dispose();
_source = null;
}
if (_target != null)
{
_target.Dispose();
_target = null;
}
}
GC.SuppressFinalize(this);
ThrowExceptionIfNeeded();
}
public IAsyncResult BeginCopy(AsyncCallback callback, object state)
{
// avoid concurrent start of the copy on separate threads
if (Interlocked.CompareExchange(ref _asyncResult, new AsyncResult(callback, state), null) != null)
throw new InvalidOperationException("A copy operation has already been started on this object.");
// allocate buffers
for (int i = 0; i < _bufferCount; i++)
_buffers[i] = new byte[_bufferSize];
// we pass false to BeginRead() to avoid completing the async result
// immediately which would result in invoking the callback
// when the method fails synchronously
BeginRead(false);
// throw exception synchronously if there is one
ThrowExceptionIfNeeded();
return _asyncResult;
}
public void EndCopy(IAsyncResult ar)
{
if (ar != _asyncResult)
throw new InvalidOperationException("Invalid IAsyncResult object.");
if (!_asyncResult.IsCompleted)
_asyncResult.AsyncWaitHandle.WaitOne();
if (_closeStreamsOnEnd)
{
_source.Close();
_source = null;
_target.Close();
_target = null;
}
//_logger.Info("AsyncStreamCopier {0} bytes requested. {1} bytes transferred", _bytesToRead, _totalBytesWritten);
ThrowExceptionIfNeeded();
}
/// <summary>
/// Here we'll throw a pending exception if there is one,
/// and remove it from our instance, so we know it has been consumed.
/// </summary>
private void ThrowExceptionIfNeeded()
{
if (_exception != null)
{
var exception = _exception;
_exception = null;
throw exception;
}
}
private void BeginRead(bool completeOnError = true)
{
if (!_moreDataToRead)
{
return;
}
if (_asyncResult.IsCompleted)
return;
int bufferIndex = Interlocked.Increment(ref _nextReadBuffer) % _bufferCount;
try
{
_source.BeginRead(_buffers[bufferIndex], 0, _bufferSize, EndRead, bufferIndex);
}
catch (Exception exception)
{
_exception = exception;
if (completeOnError)
_asyncResult.Complete(false);
}
}
private void BeginWrite()
{
if (_asyncResult.IsCompleted)
return;
// this method can actually be called concurrently!!
// indeed, let's say we call a BeginWrite, and the thread gets interrupted
// just after making the IO request.
// At that moment, the thread is still in the method. And then the IO request
// ends (extremely fast io, or caching...), EndWrite gets called
// on another thread, and calls BeginWrite again! There we have it!
// That is the reason why an Interlocked is needed here.
int bufferIndex = Interlocked.Increment(ref _nextWriteBuffer) % _bufferCount;
try
{
int bytesToWrite;
if (_bytesToRead > 0)
{
var bytesLeftToWrite = _bytesToRead - _totalBytesWritten;
bytesToWrite = Convert.ToInt32(Math.Min(_sizes[bufferIndex], bytesLeftToWrite));
}
else
{
bytesToWrite = _sizes[bufferIndex];
}
_target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null);
_totalBytesWritten += bytesToWrite;
}
catch (Exception exception)
{
_exception = exception;
_asyncResult.Complete(false);
}
}
private void EndRead(IAsyncResult ar)
{
try
{
int read = _source.EndRead(ar);
_moreDataToRead = read > 0;
var bufferIndex = (int)ar.AsyncState;
_sizes[bufferIndex] = read;
}
catch (Exception exception)
{
_exception = exception;
_asyncResult.Complete(false);
return;
}
if (_moreDataToRead && !_cancellationToken.IsCancellationRequested)
{
int usedBuffers = Interlocked.Increment(ref _buffersToWrite);
// if we incremented from zero to one, then it means we just
// added the single buffer to write, so a writer could not
// be busy, and we have to schedule one.
if (usedBuffers == 1)
BeginWrite();
// test if there is at least a free buffer, and schedule
// a read, as we have read some data
if (usedBuffers < _bufferCount)
BeginRead();
}
else
{
// we did not add a buffer, because no data was read, and
// there is no buffer left to write so this is the end...
if (Thread.VolatileRead(ref _buffersToWrite) == 0)
{
_asyncResult.Complete(false);
}
}
}
private void EndWrite(IAsyncResult ar)
{
try
{
_target.EndWrite(ar);
}
catch (Exception exception)
{
_exception = exception;
_asyncResult.Complete(false);
return;
}
int buffersLeftToWrite = Interlocked.Decrement(ref _buffersToWrite);
// no reader could be active if all buffers were full of data waiting to be written
bool noReaderIsBusy = buffersLeftToWrite == _bufferCount - 1;
// note that it is possible that both a reader and
// a writer see the end of the copy and call Complete
// on the _asyncResult object. That race condition is handled by
// Complete that ensures it is only executed fully once.
long bytesLeftToWrite;
if (_bytesToRead > 0)
{
bytesLeftToWrite = _bytesToRead - _totalBytesWritten;
}
else
{
bytesLeftToWrite = 1;
}
if (!_moreDataToRead || bytesLeftToWrite <= 0 || _cancellationToken.IsCancellationRequested)
{
// at this point we know no reader can schedule a read or write
if (Thread.VolatileRead(ref _buffersToWrite) == 0)
{
// nothing left to write, so it is the end
_asyncResult.Complete(false);
return;
}
}
else
// here, we know we have something left to read,
// so schedule a read if no read is busy
if (noReaderIsBusy)
BeginRead();
// also schedule a write if we are sure we did not write the last buffer
// note that if buffersLeftToWrite is zero and a reader has put another
// buffer to write between the time we decremented _buffersToWrite
// and now, that reader will also schedule another write,
// as it will increment _buffersToWrite from zero to one
if (buffersLeftToWrite > 0)
BeginWrite();
}
}
internal class AsyncResult : IAsyncResult, IDisposable
{
// Fields set at construction which never change while
// operation is pending
private readonly AsyncCallback _asyncCallback;
private readonly object _asyncState;
// Fields set at construction which do change after
// operation completes
private const int StatePending = 0;
private const int StateCompletedSynchronously = 1;
private const int StateCompletedAsynchronously = 2;
private int _completedState = StatePending;
// Field that may or may not get set depending on usage
private ManualResetEvent _waitHandle;
internal AsyncResult(
AsyncCallback asyncCallback,
object state)
{
_asyncCallback = asyncCallback;
_asyncState = state;
}
internal bool Complete(bool completedSynchronously)
{
bool result = false;
// The _completedState field MUST be set prior calling the callback
int prevState = Interlocked.CompareExchange(ref _completedState,
completedSynchronously ? StateCompletedSynchronously :
StateCompletedAsynchronously, StatePending);
if (prevState == StatePending)
{
// If the event exists, set it
if (_waitHandle != null)
_waitHandle.Set();
if (_asyncCallback != null)
_asyncCallback(this);
result = true;
}
return result;
}
#region Implementation of IAsyncResult
public Object AsyncState { get { return _asyncState; } }
public bool CompletedSynchronously
{
get
{
return Thread.VolatileRead(ref _completedState) ==
StateCompletedSynchronously;
}
}
public WaitHandle AsyncWaitHandle
{
get
{
if (_waitHandle == null)
{
bool done = IsCompleted;
var mre = new ManualResetEvent(done);
if (Interlocked.CompareExchange(ref _waitHandle,
mre, null) != null)
{
// Another thread created this object's event; dispose
// the event we just created
mre.Close();
}
else
{
if (!done && IsCompleted)
{
// If the operation wasn't done when we created
// the event but now it is done, set the event
_waitHandle.Set();
}
}
}
return _waitHandle;
}
}
public bool IsCompleted
{
get
{
return Thread.VolatileRead(ref _completedState) !=
StatePending;
}
}
#endregion
public void Dispose()
{
if (_waitHandle != null)
{
_waitHandle.Dispose();
_waitHandle = null;
}
}
}
}