Fix websocket handling

This commit is contained in:
Bond-009 2019-12-27 14:42:53 +01:00 committed by Bond_009
parent 5ca68f9623
commit 4d311870d2
3 changed files with 33 additions and 50 deletions

View file

@ -99,7 +99,6 @@ namespace Emby.Server.Implementations.HttpServer
/// <param name="message">The message.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>Task.</returns>
/// <exception cref="ArgumentNullException">message</exception>
public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken)
{
var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
@ -117,7 +116,6 @@ namespace Emby.Server.Implementations.HttpServer
{
// Allocate at least 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(512);
receiveresult = await _socket.ReceiveAsync(memory, cancellationToken);
int bytesRead = receiveresult.Count;
if (bytesRead == 0)
@ -144,33 +142,30 @@ namespace Emby.Server.Implementations.HttpServer
}
} while (_socket.State == WebSocketState.Open && receiveresult.MessageType != WebSocketMessageType.Close);
if (_socket.State == WebSocketState.Open)
{
_logger.LogWarning("Stopped reading from websocket before it was closed");
}
Closed?.Invoke(this, EventArgs.Empty);
_socket.Dispose();
await _socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
string.Empty,
cancellationToken).ConfigureAwait(false);
}
private async Task ProcessInternal(PipeReader reader)
{
ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;
if (OnReceive == null)
{
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.End);
return;
}
WebSocketMessage<object> stub;
try
{
var result = await reader.ReadAsync().ConfigureAwait(false);
if (!result.IsCompleted)
{
return;
}
WebSocketMessage<object> stub;
var buffer = result.Buffer;
if (buffer.IsSingleSegment)
{
stub = JsonSerializer.Deserialize<WebSocketMessage<object>>(buffer.FirstSpan, _jsonOptions);
@ -188,46 +183,36 @@ namespace Emby.Server.Implementations.HttpServer
ArrayPool<byte>.Shared.Return(buf);
}
}
var info = new WebSocketMessageInfo
{
MessageType = stub.MessageType,
Data = stub.Data.ToString(),
Connection = this
};
await OnReceive(info).ConfigureAwait(false);
}
catch (JsonException ex)
{
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.End);
_logger.LogError(ex, "Error processing web socket message");
}
}
/// <inheritdoc />
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Releases unmanaged and - optionally - managed resources.
/// </summary>
/// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
protected virtual void Dispose(bool dispose)
{
if (_disposed)
{
return;
}
if (dispose)
{
_socket.Dispose();
}
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.End);
_disposed = true;
_logger.LogDebug("WS received message: {@Message}", stub);
var info = new WebSocketMessageInfo
{
MessageType = stub.MessageType,
Data = stub.Data?.ToString(), // Data can be null
Connection = this
};
_logger.LogDebug("WS message info: {@MessageInfo}", info);
await OnReceive(info).ConfigureAwait(false);
// Stop reading if there's no more data coming
if (result.IsCompleted)
{
return;
}
}
}
}

View file

@ -57,7 +57,6 @@ namespace Emby.Server.Implementations.Session
_logger.LogDebug("Removing websocket from session {Session}", _session.Id);
_sockets.Remove(connection);
connection.Closed -= OnConnectionClosed;
connection.Dispose();
_sessionManager.CloseIfNeeded(_session);
}
@ -96,7 +95,6 @@ namespace Emby.Server.Implementations.Session
foreach (var socket in _sockets)
{
socket.Closed -= OnConnectionClosed;
socket.Dispose();
}
_disposed = true;

View file

@ -10,7 +10,7 @@ using Microsoft.AspNetCore.Http;
namespace MediaBrowser.Controller.Net
{
public interface IWebSocketConnection : IDisposable
public interface IWebSocketConnection
{
/// <summary>
/// Occurs when [closed].