#nullable enable using System; using System.Buffers; using System.IO.Pipelines; using System.Net; using System.Net.WebSockets; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Common.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.HttpServer { /// /// Class WebSocketConnection. /// public class WebSocketConnection : IWebSocketConnection { /// /// The logger. /// private readonly ILogger _logger; /// /// The json serializer options. /// private readonly JsonSerializerOptions _jsonOptions; /// /// The socket. /// private readonly WebSocket _socket; private bool _disposed = false; /// /// Initializes a new instance of the class. /// /// The logger. /// The socket. /// The remote end point. /// The query. public WebSocketConnection( ILogger logger, WebSocket socket, IPAddress? remoteEndPoint, IQueryCollection query) { _logger = logger; _socket = socket; RemoteEndPoint = remoteEndPoint; QueryString = query; _jsonOptions = JsonDefaults.GetOptions(); LastActivityDate = DateTime.Now; } /// public event EventHandler? Closed; /// /// Gets or sets the remote end point. /// public IPAddress? RemoteEndPoint { get; } /// /// Gets or sets the receive action. /// /// The receive action. public Func? OnReceive { get; set; } /// /// Gets the last activity date. /// /// The last activity date. public DateTime LastActivityDate { get; private set; } /// /// Gets or sets the query string. /// /// The query string. public IQueryCollection QueryString { get; } /// /// Gets the state. /// /// The state. public WebSocketState State => _socket.State; /// /// Sends a message asynchronously. /// /// /// The message. /// The cancellation token. /// Task. /// message public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) { var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); } /// public async Task ProcessAsync(CancellationToken cancellationToken = default) { var pipe = new Pipe(); var writer = pipe.Writer; ValueWebSocketReceiveResult receiveresult; do { // Allocate at least 512 bytes from the PipeWriter Memory memory = writer.GetMemory(512); receiveresult = await _socket.ReceiveAsync(memory, cancellationToken); int bytesRead = receiveresult.Count; if (bytesRead == 0) { break; } // Tell the PipeWriter how much was read from the Socket writer.Advance(bytesRead); // Make the data available to the PipeReader FlushResult flushResult = await writer.FlushAsync(); if (flushResult.IsCompleted) { // The PipeReader stopped reading break; } LastActivityDate = DateTime.UtcNow; if (receiveresult.EndOfMessage) { await ProcessInternal(pipe.Reader).ConfigureAwait(false); } } while (_socket.State == WebSocketState.Open && receiveresult.MessageType != WebSocketMessageType.Close); if (_socket.State == WebSocketState.Open) { _logger.LogWarning("Stopped reading from websocket before it was closed"); } Closed?.Invoke(this, EventArgs.Empty); _socket.Dispose(); } private async Task ProcessInternal(PipeReader reader) { if (OnReceive == null) { return; } try { var result = await reader.ReadAsync().ConfigureAwait(false); if (!result.IsCompleted) { return; } WebSocketMessage stub; var buffer = result.Buffer; if (buffer.IsSingleSegment) { stub = JsonSerializer.Deserialize>(buffer.FirstSpan, _jsonOptions); } else { var buf = ArrayPool.Shared.Rent(Convert.ToInt32(buffer.Length)); try { buffer.CopyTo(buf); stub = JsonSerializer.Deserialize>(buf, _jsonOptions); } finally { ArrayPool.Shared.Return(buf); } } var info = new WebSocketMessageInfo { MessageType = stub.MessageType, Data = stub.Data.ToString(), Connection = this }; await OnReceive(info).ConfigureAwait(false); } catch (JsonException ex) { _logger.LogError(ex, "Error processing web socket message"); } } /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { if (_disposed) { return; } if (dispose) { _socket.Dispose(); } _disposed = true; } } }