From caaa906604c759e6899ebf6be6f5ac4f9845db84 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Sun, 26 Mar 2017 15:00:35 -0400 Subject: [PATCH] update socket methods --- Emby.Common.Implementations/Net/UdpSocket.cs | 256 +++++++------------ Emby.Server.Core/IO/LibraryMonitor.cs | 11 - Emby.Server.Implementations/Udp/UdpServer.cs | 26 +- MediaBrowser.Model/Net/ISocket.cs | 1 + RSSDP/SsdpCommunicationsServer.cs | 10 +- 5 files changed, 107 insertions(+), 197 deletions(-) diff --git a/Emby.Common.Implementations/Net/UdpSocket.cs b/Emby.Common.Implementations/Net/UdpSocket.cs index 45cb42ecda..94d073bd26 100644 --- a/Emby.Common.Implementations/Net/UdpSocket.cs +++ b/Emby.Common.Implementations/Net/UdpSocket.cs @@ -19,12 +19,20 @@ namespace Emby.Common.Implementations.Net private Socket _Socket; private int _LocalPort; - private SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs() + private readonly SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs() + { + SocketFlags = SocketFlags.None + }; + + private readonly SocketAsyncEventArgs _sendSocketAsyncEventArgs = new SocketAsyncEventArgs() { SocketFlags = SocketFlags.None }; private TaskCompletionSource _currentReceiveTaskCompletionSource; + private TaskCompletionSource _currentSendTaskCompletionSource; + + private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1); public UdpSocket(Socket socket, int localPort, IPAddress ip) { @@ -41,9 +49,13 @@ namespace Emby.Common.Implementations.Net private void InitReceiveSocketAsyncEventArgs() { - var buffer = new byte[8192]; - _receiveSocketAsyncEventArgs.SetBuffer(buffer, 0, buffer.Length); + var receiveBuffer = new byte[8192]; + _receiveSocketAsyncEventArgs.SetBuffer(receiveBuffer, 0, receiveBuffer.Length); _receiveSocketAsyncEventArgs.Completed += _receiveSocketAsyncEventArgs_Completed; + + var sendBuffer = new byte[8192]; + _sendSocketAsyncEventArgs.SetBuffer(sendBuffer, 0, sendBuffer.Length); + _sendSocketAsyncEventArgs.Completed += _sendSocketAsyncEventArgs_Completed; } private void _receiveSocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e) @@ -53,13 +65,38 @@ namespace Emby.Common.Implementations.Net { _currentReceiveTaskCompletionSource = null; - tcs.TrySetResult(new SocketReceiveResult + if (e.SocketError == SocketError.Success) { - Buffer = e.Buffer, - ReceivedBytes = e.BytesTransferred, - RemoteEndPoint = ToIpEndPointInfo(e.RemoteEndPoint as IPEndPoint), - LocalIPAddress = LocalIPAddress - }); + tcs.TrySetResult(new SocketReceiveResult + { + Buffer = e.Buffer, + ReceivedBytes = e.BytesTransferred, + RemoteEndPoint = ToIpEndPointInfo(e.RemoteEndPoint as IPEndPoint), + LocalIPAddress = LocalIPAddress + }); + } + else + { + tcs.TrySetException(new Exception("SocketError: " + e.SocketError)); + } + } + } + + private void _sendSocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e) + { + var tcs = _currentSendTaskCompletionSource; + if (tcs != null) + { + _currentSendTaskCompletionSource = null; + + if (e.SocketError == SocketError.Success) + { + tcs.TrySetResult(e.BytesTransferred); + } + else + { + tcs.TrySetException(new Exception("SocketError: " + e.SocketError)); + } } } @@ -79,8 +116,6 @@ namespace Emby.Common.Implementations.Net private set; } - #region ISocket Members - public Task ReceiveAsync(CancellationToken cancellationToken) { ThrowIfDisposed(); @@ -90,31 +125,15 @@ namespace Emby.Common.Implementations.Net EndPoint receivedFromEndPoint = new IPEndPoint(IPAddress.Any, 0); cancellationToken.Register(() => tcs.TrySetCanceled()); -#if NETSTANDARD1_6 - var state = new AsyncReceiveState(_Socket, receivedFromEndPoint); - state.TaskCompletionSource = tcs; - - _Socket.ReceiveFromAsync(new ArraySegment(state.Buffer), SocketFlags.None, state.RemoteEndPoint) - .ContinueWith((task, asyncState) => - { - if (task.Status != TaskStatus.Faulted) - { - var receiveState = asyncState as AsyncReceiveState; - receiveState.RemoteEndPoint = task.Result.RemoteEndPoint; - ProcessResponse(receiveState, () => task.Result.ReceivedBytes, LocalIPAddress); - } - }, state); -#else - //var state = new AsyncReceiveState(_Socket, receivedFromEndPoint); - //state.TaskCompletionSource = tcs; - - //_Socket.BeginReceiveFrom(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ref state.RemoteEndPoint, ProcessResponse, state); - _receiveSocketAsyncEventArgs.RemoteEndPoint = receivedFromEndPoint; _currentReceiveTaskCompletionSource = tcs; - var isPending = _Socket.ReceiveFromAsync(_receiveSocketAsyncEventArgs); -#endif + var willRaiseEvent = _Socket.ReceiveFromAsync(_receiveSocketAsyncEventArgs); + + if (!willRaiseEvent) + { + _receiveSocketAsyncEventArgs_Completed(this, _receiveSocketAsyncEventArgs); + } return tcs.Task; } @@ -126,60 +145,42 @@ namespace Emby.Common.Implementations.Net if (buffer == null) throw new ArgumentNullException("messageData"); if (endPoint == null) throw new ArgumentNullException("endPoint"); - var ipEndPoint = NetworkManager.ToIPEndPoint(endPoint); - -#if NETSTANDARD1_6 - - if (size != buffer.Length) - { - byte[] copy = new byte[size]; - Buffer.BlockCopy(buffer, 0, copy, 0, size); - buffer = copy; - } - cancellationToken.ThrowIfCancellationRequested(); - _Socket.SendTo(buffer, ipEndPoint); - return Task.FromResult(true); -#else - var taskSource = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(); + + cancellationToken.Register(() => tcs.TrySetCanceled()); + + _sendSocketAsyncEventArgs.SetBuffer(buffer, 0, size); + _sendSocketAsyncEventArgs.RemoteEndPoint = NetworkManager.ToIPEndPoint(endPoint); + _currentSendTaskCompletionSource = tcs; + + var willRaiseEvent = _Socket.SendAsync(_sendSocketAsyncEventArgs); + + if (!willRaiseEvent) + { + _sendSocketAsyncEventArgs_Completed(this, _sendSocketAsyncEventArgs); + } + + return tcs.Task; + } + + public async Task SendWithLockAsync(byte[] buffer, int size, IpEndPointInfo endPoint, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + + await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { - _Socket.BeginSendTo(buffer, 0, size, SocketFlags.None, ipEndPoint, result => - { - if (cancellationToken.IsCancellationRequested) - { - taskSource.TrySetCanceled(); - return; - } - try - { - _Socket.EndSend(result); - taskSource.TrySetResult(true); - } - catch (Exception ex) - { - taskSource.TrySetException(ex); - } - - }, null); + await SendAsync(buffer, size, endPoint, cancellationToken).ConfigureAwait(false); } - catch (Exception ex) + finally { - taskSource.TrySetException(ex); + _sendLock.Release(); } - - //_Socket.SendTo(messageData, new System.Net.IPEndPoint(IPAddress.Parse(RemoteEndPoint.IPAddress), RemoteEndPoint.Port)); - - return taskSource.Task; -#endif } - #endregion - - #region Overrides - protected override void Dispose(bool disposing) { if (disposing) @@ -187,44 +188,19 @@ namespace Emby.Common.Implementations.Net var socket = _Socket; if (socket != null) socket.Dispose(); - } - } - #endregion + _sendLock.Dispose(); - #region Private Methods - - private static void ProcessResponse(AsyncReceiveState state, Func receiveData, IpAddressInfo localIpAddress) - { - try - { - var bytesRead = receiveData(); - - var ipEndPoint = state.RemoteEndPoint as IPEndPoint; - state.TaskCompletionSource.TrySetResult( - new SocketReceiveResult - { - Buffer = state.Buffer, - ReceivedBytes = bytesRead, - RemoteEndPoint = ToIpEndPointInfo(ipEndPoint), - LocalIPAddress = localIpAddress - } - ); - } - catch (ObjectDisposedException) - { - state.TaskCompletionSource.TrySetCanceled(); - } - catch (SocketException se) - { - if (se.SocketErrorCode != SocketError.Interrupted && se.SocketErrorCode != SocketError.OperationAborted && se.SocketErrorCode != SocketError.Shutdown) - state.TaskCompletionSource.TrySetException(se); - else - state.TaskCompletionSource.TrySetCanceled(); - } - catch (Exception ex) - { - state.TaskCompletionSource.TrySetException(ex); + var tcs = _currentReceiveTaskCompletionSource; + if (tcs != null) + { + tcs.TrySetCanceled(); + } + var sendTcs = _currentSendTaskCompletionSource; + if (sendTcs != null) + { + sendTcs.TrySetCanceled(); + } } } @@ -237,59 +213,5 @@ namespace Emby.Common.Implementations.Net return NetworkManager.ToIpEndPointInfo(endpoint); } - - private void ProcessResponse(IAsyncResult asyncResult) - { -#if NET46 - var state = asyncResult.AsyncState as AsyncReceiveState; - try - { - var bytesRead = state.Socket.EndReceiveFrom(asyncResult, ref state.RemoteEndPoint); - - var ipEndPoint = state.RemoteEndPoint as IPEndPoint; - state.TaskCompletionSource.TrySetResult( - new SocketReceiveResult - { - Buffer = state.Buffer, - ReceivedBytes = bytesRead, - RemoteEndPoint = ToIpEndPointInfo(ipEndPoint), - LocalIPAddress = LocalIPAddress - } - ); - } - catch (ObjectDisposedException) - { - state.TaskCompletionSource.TrySetCanceled(); - } - catch (Exception ex) - { - state.TaskCompletionSource.TrySetException(ex); - } -#endif - } - - #endregion - - #region Private Classes - - private class AsyncReceiveState - { - public AsyncReceiveState(Socket socket, EndPoint remoteEndPoint) - { - this.Socket = socket; - this.RemoteEndPoint = remoteEndPoint; - } - - public EndPoint RemoteEndPoint; - public byte[] Buffer = new byte[8192]; - - public Socket Socket { get; private set; } - - public TaskCompletionSource TaskCompletionSource { get; set; } - - } - - #endregion - } } diff --git a/Emby.Server.Core/IO/LibraryMonitor.cs b/Emby.Server.Core/IO/LibraryMonitor.cs index 4df9b930e5..e1e3186c3b 100644 --- a/Emby.Server.Core/IO/LibraryMonitor.cs +++ b/Emby.Server.Core/IO/LibraryMonitor.cs @@ -421,17 +421,6 @@ namespace Emby.Server.Core.IO var path = e.FullPath; - // For deletes, use the parent path - if (e.ChangeType == WatcherChangeTypes.Deleted) - { - var parentPath = Path.GetDirectoryName(path); - - if (!string.IsNullOrWhiteSpace(parentPath)) - { - path = parentPath; - } - } - ReportFileSystemChanged(path); } catch (Exception ex) diff --git a/Emby.Server.Implementations/Udp/UdpServer.cs b/Emby.Server.Implementations/Udp/UdpServer.cs index bb303d8fae..21ef3cab64 100644 --- a/Emby.Server.Implementations/Udp/UdpServer.cs +++ b/Emby.Server.Implementations/Udp/UdpServer.cs @@ -203,19 +203,6 @@ namespace Emby.Server.Implementations.Udp GC.SuppressFinalize(this); } - /// - /// Stops this instance. - /// - public void Stop() - { - _isDisposed = true; - - if (_udpClient != null) - { - _udpClient.Dispose(); - } - } - /// /// Releases unmanaged and - optionally - managed resources. /// @@ -224,7 +211,12 @@ namespace Emby.Server.Implementations.Udp { if (dispose) { - Stop(); + _isDisposed = true; + + if (_udpClient != null) + { + _udpClient.Dispose(); + } } } @@ -247,9 +239,13 @@ namespace Emby.Server.Implementations.Udp try { - await _udpClient.SendAsync(bytes, bytes.Length, remoteEndPoint, CancellationToken.None).ConfigureAwait(false); + await _udpClient.SendWithLockAsync(bytes, bytes.Length, remoteEndPoint, CancellationToken.None).ConfigureAwait(false); _logger.Info("Udp message sent to {0}", remoteEndPoint); + } + catch (OperationCanceledException) + { + } catch (Exception ex) { diff --git a/MediaBrowser.Model/Net/ISocket.cs b/MediaBrowser.Model/Net/ISocket.cs index 90070b1285..61fc0e28bf 100644 --- a/MediaBrowser.Model/Net/ISocket.cs +++ b/MediaBrowser.Model/Net/ISocket.cs @@ -24,5 +24,6 @@ namespace MediaBrowser.Model.Net /// Sends a UDP message to a particular end point (uni or multicast). /// Task SendAsync(byte[] buffer, int bytes, IpEndPointInfo endPoint, CancellationToken cancellationToken); + Task SendWithLockAsync(byte[] buffer, int bytes, IpEndPointInfo endPoint, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/RSSDP/SsdpCommunicationsServer.cs b/RSSDP/SsdpCommunicationsServer.cs index 99e3969aad..cc464e689c 100644 --- a/RSSDP/SsdpCommunicationsServer.cs +++ b/RSSDP/SsdpCommunicationsServer.cs @@ -177,11 +177,15 @@ namespace Rssdp.Infrastructure { try { - await socket.SendAsync(messageData, messageData.Length, destination, cancellationToken).ConfigureAwait(false); + await socket.SendWithLockAsync(messageData, messageData.Length, destination, cancellationToken).ConfigureAwait(false); } catch (ObjectDisposedException) { + } + catch (OperationCanceledException) + { + } catch (Exception ex) { @@ -341,11 +345,9 @@ namespace Rssdp.Infrastructure foreach (var socket in sockets) { - await socket.SendAsync(messageData, messageData.Length, destination, cancellationToken).ConfigureAwait(false); + await SendFromSocket(socket, messageData, destination, cancellationToken).ConfigureAwait(false); } } - - ThrowIfDisposed(); } private ISocket ListenForBroadcastsAsync()