diff --git a/Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs b/Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs index 6b516977e8..366301d3ee 100644 --- a/Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs +++ b/Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs @@ -123,9 +123,8 @@ namespace Jellyfin.Api.Helpers state.Dispose(); } - await new ProgressiveFileCopier(outputPath, job, transcodingJobHelper, CancellationToken.None) - .WriteToAsync(httpContext.Response.Body, CancellationToken.None).ConfigureAwait(false); - return new FileStreamResult(httpContext.Response.Body, contentType); + var stream = new ProgressiveFileStream(outputPath, job, transcodingJobHelper); + return new FileStreamResult(stream, contentType); } finally { diff --git a/Jellyfin.Api/Helpers/ProgressiveFileStream.cs b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs new file mode 100644 index 0000000000..e09f3dca94 --- /dev/null +++ b/Jellyfin.Api/Helpers/ProgressiveFileStream.cs @@ -0,0 +1,164 @@ +using System; +using System.IO; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Jellyfin.Api.Models.PlaybackDtos; +using MediaBrowser.Model.IO; + +namespace Jellyfin.Api.Helpers +{ + /// + /// A progressive file stream for transferring transcoded files as they are written to. + /// + public class ProgressiveFileStream : Stream + { + private readonly FileStream _fileStream; + private readonly TranscodingJobDto? _job; + private readonly TranscodingJobHelper _transcodingJobHelper; + private readonly bool _allowAsyncFileRead; + private int _bytesWritten; + private bool _disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The path to the transcoded file. + /// The transcoding job information. + /// The transcoding job helper. + public ProgressiveFileStream(string filePath, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper) + { + _job = job; + _transcodingJobHelper = transcodingJobHelper; + _bytesWritten = 0; + + var fileOptions = FileOptions.SequentialScan; + _allowAsyncFileRead = false; + + // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 + if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + fileOptions |= FileOptions.Asynchronous; + _allowAsyncFileRead = true; + } + + _fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions); + } + + /// + public override bool CanRead => _fileStream.CanRead; + + /// + public override bool CanSeek => false; + + /// + public override bool CanWrite => false; + + /// + public override long Length => throw new NotSupportedException(); + + /// + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + public override void Flush() + { + _fileStream.Flush(); + } + + /// + public override int Read(byte[] buffer, int offset, int count) + { + return _fileStream.Read(buffer, offset, count); + } + + /// + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var eofCount = 0; + const int EmptyReadLimit = 20; + + int totalBytesRead = 0; + int remainingBytesToRead = count; + + while (eofCount < EmptyReadLimit && remainingBytesToRead > 0) + { + cancellationToken.ThrowIfCancellationRequested(); + int bytesRead; + if (_allowAsyncFileRead) + { + bytesRead = await _fileStream.ReadAsync(buffer, offset, remainingBytesToRead, cancellationToken).ConfigureAwait(false); + } + else + { + bytesRead = _fileStream.Read(buffer, offset, remainingBytesToRead); + } + + remainingBytesToRead -= bytesRead; + if (bytesRead > 0) + { + _bytesWritten += bytesRead; + totalBytesRead += bytesRead; + + if (_job != null) + { + _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten); + } + } + + if (bytesRead == 0) + { + if (_job == null || _job.HasExited) + { + eofCount++; + } + + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } + else + { + eofCount = 0; + } + } + + return totalBytesRead; + } + + /// + public override long Seek(long offset, SeekOrigin origin) + => throw new NotSupportedException(); + + /// + public override void SetLength(long value) + => throw new NotSupportedException(); + + /// + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + /// + protected override void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + if (disposing) + { + _fileStream.Dispose(); + + if (_job != null) + { + _transcodingJobHelper.OnTranscodeEndRequest(_job); + } + } + + _disposed = true; + } + } +}