using MediaBrowser.Common.Serialization; using MediaBrowser.Model.Logging; using System; using System.Net; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; namespace MediaBrowser.Common.Net { /// /// Class WebSocketConnection /// public class WebSocketConnection : IDisposable { /// /// The _socket /// private readonly IWebSocket _socket; /// /// The _remote end point /// public readonly string RemoteEndPoint; /// /// The _cancellation token source /// private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); /// /// The _send semaphore /// private readonly SemaphoreSlim _sendSemaphore = new SemaphoreSlim(1,1); /// /// The logger /// private readonly ILogger _logger; /// /// Initializes a new instance of the class. /// /// The socket. /// The remote end point. /// The receive action. /// socket public WebSocketConnection(IWebSocket socket, string remoteEndPoint, Action receiveAction, ILogger logger) { if (socket == null) { throw new ArgumentNullException("socket"); } if (string.IsNullOrEmpty(remoteEndPoint)) { throw new ArgumentNullException("remoteEndPoint"); } if (receiveAction == null) { throw new ArgumentNullException("receiveAction"); } if (logger == null) { throw new ArgumentNullException("logger"); } _socket = socket; _socket.OnReceiveDelegate = info => OnReceive(info, receiveAction); RemoteEndPoint = remoteEndPoint; _logger = logger; } /// /// Called when [receive]. /// /// The info. /// The callback. private void OnReceive(WebSocketMessageInfo info, Action callback) { try { info.Connection = this; callback(info); } catch (Exception ex) { _logger.ErrorException("Error processing web socket message", ex); } } /// /// Sends a message asynchronously. /// /// /// The message. /// The cancellation token. /// Task. /// message public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) { if (message == null) { throw new ArgumentNullException("message"); } var bytes = JsonSerializer.SerializeToBytes(message); return SendAsync(bytes, cancellationToken); } /// /// Sends a message asynchronously. /// /// The buffer. /// The cancellation token. /// Task. public Task SendAsync(byte[] buffer, CancellationToken cancellationToken) { return SendAsync(buffer, WebSocketMessageType.Text, cancellationToken); } /// /// Sends a message asynchronously. /// /// The buffer. /// The type. /// The cancellation token. /// Task. /// buffer public async Task SendAsync(byte[] buffer, WebSocketMessageType type, CancellationToken cancellationToken) { if (buffer == null) { throw new ArgumentNullException("buffer"); } if (cancellationToken == null) { throw new ArgumentNullException("cancellationToken"); } cancellationToken.ThrowIfCancellationRequested(); // Per msdn docs, attempting to send simultaneous messages will result in one failing. // This should help us workaround that and ensure all messages get sent await _sendSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { await _socket.SendAsync(buffer, type, true, cancellationToken); } catch (OperationCanceledException) { _logger.Info("WebSocket message to {0} was cancelled", RemoteEndPoint); throw; } catch (Exception ex) { _logger.ErrorException("Error sending WebSocket message {0}", ex, RemoteEndPoint); throw; } finally { _sendSemaphore.Release(); } } /// /// Gets the state. /// /// The state. public WebSocketState State { get { return _socket.State; } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { if (dispose) { _cancellationTokenSource.Dispose(); _socket.Dispose(); } } } /// /// Class WebSocketMessage /// /// public class WebSocketMessage { /// /// Gets or sets the type of the message. /// /// The type of the message. public string MessageType { get; set; } /// /// Gets or sets the data. /// /// The data. public T Data { get; set; } } /// /// Class WebSocketMessageInfo /// public class WebSocketMessageInfo : WebSocketMessage { /// /// Gets or sets the connection. /// /// The connection. public WebSocketConnection Connection { get; set; } } }