From 7d3aa60db0800a6dd8a59dbdb9ce28e3ca06ba26 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Mon, 27 Mar 2017 15:31:24 -0400 Subject: [PATCH] update hdhr udp stream --- .../Library/MediaSourceManager.cs | 2 + .../TunerHosts/HdHomerun/HdHomerunHost.cs | 2 +- .../HdHomerun/HdHomerunUdpStream.cs | 125 +++++++++++++++++- .../LiveTv/TunerHosts/MulticastStream.cs | 37 ++++-- .../LiveTv/TunerHosts/QueueStream.cs | 39 +++++- 5 files changed, 187 insertions(+), 18 deletions(-) diff --git a/Emby.Server.Implementations/Library/MediaSourceManager.cs b/Emby.Server.Implementations/Library/MediaSourceManager.cs index c1bd8fe915..ccd4c3631e 100644 --- a/Emby.Server.Implementations/Library/MediaSourceManager.cs +++ b/Emby.Server.Implementations/Library/MediaSourceManager.cs @@ -369,6 +369,8 @@ namespace Emby.Server.Implementations.Library { await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + enableAutoClose = false; + try { var tuple = GetProvider(request.OpenToken); diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs index 2fac961698..8fa1bbe231 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs @@ -510,7 +510,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun // The UDP method is not working reliably on OSX, and on BSD it hasn't been tested yet var enableHttpStream = _environment.OperatingSystem == OperatingSystem.OSX || _environment.OperatingSystem == OperatingSystem.BSD; - + enableHttpStream = true; if (enableHttpStream) { mediaSource.Protocol = MediaProtocol.Http; diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs index 92000a1b38..e1572ea3ff 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs @@ -130,7 +130,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun onStarted = () => openTaskCompletionSource.TrySetResult(true); } - await _multicastStream.CopyUntilCancelled(udpClient, onStarted, cancellationToken).ConfigureAwait(false); + await _multicastStream.CopyUntilCancelled(new UdpClientStream(udpClient), onStarted, cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException ex) @@ -167,4 +167,127 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun return _multicastStream.CopyToAsync(stream); } } + + // This handles the ReadAsync function only of a Stream object + // This is used to wrap a UDP socket into a stream for MulticastStream which only uses ReadAsync + public class UdpClientStream : Stream + { + private static int RtpHeaderBytes = 12; + private static int PacketSize = 1316; + private readonly ISocket _udpClient; + bool disposed; + + public UdpClientStream(ISocket udpClient) : base() + { + _udpClient = udpClient; + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (buffer == null) + throw new ArgumentNullException("buffer"); + + if (offset + count < 0) + throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count"); + + if (offset + count > buffer.Length) + throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count"); + + if (disposed) + throw new ObjectDisposedException(typeof(UdpClientStream).ToString()); + + // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize) + // The RTP header will be stripped so see how many reads we need to make to fill the buffer. + int numReads = count / PacketSize; + int totalBytesRead = 0; + + for (int i = 0; i < numReads; ++i) + { + var data = await _udpClient.ReceiveAsync(cancellationToken).ConfigureAwait(false); + + var bytesRead = data.ReceivedBytes - RtpHeaderBytes; + + // remove rtp header + Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, buffer, offset, bytesRead); + offset += bytesRead; + totalBytesRead += bytesRead; + } + return totalBytesRead; + } + + protected override void Dispose(bool disposing) + { + disposed = true; + } + + public override bool CanRead + { + get + { + throw new NotImplementedException(); + } + } + + public override bool CanSeek + { + get + { + throw new NotImplementedException(); + } + } + + public override bool CanWrite + { + get + { + throw new NotImplementedException(); + } + } + + public override long Length + { + get + { + throw new NotImplementedException(); + } + } + + public override long Position + { + get + { + throw new NotImplementedException(); + } + + set + { + throw new NotImplementedException(); + } + } + + public override void Flush() + { + throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + } } \ No newline at end of file diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs index e3d0d1eba3..281632590a 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs @@ -35,13 +35,21 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts if (bytesRead > 0) { - byte[] copy = new byte[bytesRead]; - Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead); - var allStreams = _outputStreams.ToList(); - foreach (var stream in allStreams) + + if (allStreams.Count == 1) { - stream.Value.Queue(copy, 0, copy.Length); + await allStreams[0].Value.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false); + } + else + { + byte[] copy = new byte[bytesRead]; + Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead); + + foreach (var stream in allStreams) + { + stream.Value.Queue(copy, 0, copy.Length); + } } if (onStarted != null) @@ -79,14 +87,21 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts if (bytesRead > 0) { - byte[] copy = new byte[bytesRead]; - Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, copy, 0, bytesRead); - var allStreams = _outputStreams.ToList(); - foreach (var stream in allStreams) + + if (allStreams.Count == 1) { - //stream.Value.Queue(data.Buffer, RtpHeaderBytes, bytesRead); - stream.Value.Queue(copy, 0, copy.Length); + await allStreams[0].Value.WriteAsync(data.Buffer, 0, bytesRead).ConfigureAwait(false); + } + else + { + byte[] copy = new byte[bytesRead]; + Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, copy, 0, bytesRead); + + foreach (var stream in allStreams) + { + stream.Value.Queue(copy, 0, copy.Length); + } } if (onStarted != null) diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs index 27dd288a79..543d2e373e 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs @@ -13,7 +13,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts public class QueueStream { private readonly Stream _outputStream; - private readonly ConcurrentQueue> _queue = new ConcurrentQueue>(); + private readonly ConcurrentQueue> _queue = new ConcurrentQueue>(); private CancellationToken _cancellationToken; public TaskCompletionSource TaskCompletion { get; private set; } @@ -50,6 +50,38 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts return null; } + private void OnClosed() + { + GC.Collect(); + if (OnFinished != null) + { + OnFinished(this); + } + } + + public async Task WriteAsync(byte[] bytes, int offset, int count) + { + //return _outputStream.WriteAsync(bytes, offset, count, cancellationToken); + var cancellationToken = _cancellationToken; + + try + { + await _outputStream.WriteAsync(bytes, offset, count, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + _logger.Debug("QueueStream cancelled"); + TaskCompletion.TrySetCanceled(); + OnClosed(); + } + catch (Exception ex) + { + _logger.ErrorException("Error in QueueStream", ex); + TaskCompletion.TrySetException(ex); + OnClosed(); + } + } + private async Task StartInternal() { var cancellationToken = _cancellationToken; @@ -81,10 +113,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts } finally { - if (OnFinished != null) - { - OnFinished(this); - } + OnClosed(); } } }