Make more things async

This commit is contained in:
Bond_009 2019-02-09 15:39:17 +01:00
parent 2fc97212a7
commit 449074e73f
6 changed files with 163 additions and 178 deletions

View file

@ -43,10 +43,11 @@ namespace Jellyfin.Server.SocketSharp
socket.OnMessage += socket_OnMessage;
socket.OnClose += socket_OnClose;
socket.OnError += socket_OnError;
WebSocket.ConnectAsServer();
}
public Task ConnectAsServerAsync()
=> WebSocket.ConnectAsServer();
public Task StartReceive()
{
return _taskCompletionSource.Task;
@ -133,7 +134,7 @@ namespace Jellyfin.Server.SocketSharp
_cancellationTokenSource.Cancel();
WebSocket.Close();
WebSocket.CloseAsync().GetAwaiter().GetResult();
}
}

View file

@ -34,9 +34,16 @@ namespace Jellyfin.Server.SocketSharp
private CancellationTokenSource _disposeCancellationTokenSource = new CancellationTokenSource();
private CancellationToken _disposeCancellationToken;
public WebSocketSharpListener(ILogger logger, X509Certificate certificate, IStreamHelper streamHelper,
INetworkManager networkManager, ISocketFactory socketFactory, ICryptoProvider cryptoProvider,
bool enableDualMode, IFileSystem fileSystem, IEnvironmentInfo environment)
public WebSocketSharpListener(
ILogger logger,
X509Certificate certificate,
IStreamHelper streamHelper,
INetworkManager networkManager,
ISocketFactory socketFactory,
ICryptoProvider cryptoProvider,
bool enableDualMode,
IFileSystem fileSystem,
IEnvironmentInfo environment)
{
_logger = logger;
_certificate = certificate;
@ -61,7 +68,9 @@ namespace Jellyfin.Server.SocketSharp
public void Start(IEnumerable<string> urlPrefixes)
{
if (_listener == null)
{
_listener = new HttpListener(_logger, _cryptoProvider, _socketFactory, _networkManager, _streamHelper, _fileSystem, _environment);
}
_listener.EnableDualMode = _enableDualMode;
@ -70,22 +79,14 @@ namespace Jellyfin.Server.SocketSharp
_listener.LoadCert(_certificate);
}
foreach (var prefix in urlPrefixes)
{
_logger.LogInformation("Adding HttpListener prefix " + prefix);
_listener.Prefixes.Add(prefix);
}
_logger.LogInformation("Adding HttpListener prefixes {Prefixes}", urlPrefixes);
_listener.Prefixes.AddRange(urlPrefixes);
_listener.OnContext = ProcessContext;
_listener.OnContext = async c => await InitTask(c, _disposeCancellationToken).ConfigureAwait(false);
_listener.Start();
}
private void ProcessContext(HttpListenerContext context)
{
var _ = Task.Run(async () => await InitTask(context, _disposeCancellationToken));
}
private static void LogRequest(ILogger logger, HttpListenerRequest request)
{
var url = request.Url.ToString();
@ -139,10 +140,7 @@ namespace Jellyfin.Server.SocketSharp
Endpoint = endpoint
};
if (WebSocketConnecting != null)
{
WebSocketConnecting(connectingArgs);
}
WebSocketConnecting?.Invoke(connectingArgs);
if (connectingArgs.AllowConnection)
{
@ -153,6 +151,7 @@ namespace Jellyfin.Server.SocketSharp
if (WebSocketConnected != null)
{
var socket = new SharpWebSocket(webSocketContext.WebSocket, _logger);
await socket.ConnectAsServerAsync().ConfigureAwait(false);
WebSocketConnected(new WebSocketConnectEventArgs
{
@ -162,7 +161,7 @@ namespace Jellyfin.Server.SocketSharp
Endpoint = endpoint
});
await ReceiveWebSocket(ctx, socket).ConfigureAwait(false);
await ReceiveWebSocketAsync(ctx, socket).ConfigureAwait(false);
}
}
else
@ -180,7 +179,7 @@ namespace Jellyfin.Server.SocketSharp
}
}
private async Task ReceiveWebSocket(HttpListenerContext ctx, SharpWebSocket socket)
private async Task ReceiveWebSocketAsync(HttpListenerContext ctx, SharpWebSocket socket)
{
try
{

View file

@ -4,7 +4,6 @@ using System.IO;
using System.IO.Compression;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.Services;
using HttpStatusCode = SocketHttpListener.Net.HttpStatusCode;
@ -75,27 +74,6 @@ namespace SocketHttpListener
}
}
private static byte[] readBytes(this Stream stream, byte[] buffer, int offset, int length)
{
var len = stream.Read(buffer, offset, length);
if (len < 1)
return buffer.SubArray(0, offset);
var tmp = 0;
while (len < length)
{
tmp = stream.Read(buffer, offset + len, length - len);
if (tmp < 1)
break;
len += tmp;
}
return len < length
? buffer.SubArray(0, offset + len)
: buffer;
}
private static async Task<byte[]> ReadBytesAsync(this Stream stream, byte[] buffer, int offset, int length)
{
var len = await stream.ReadAsync(buffer, offset, length).ConfigureAwait(false);
@ -119,15 +97,6 @@ namespace SocketHttpListener
: buffer;
}
private static bool readBytes(this Stream stream, byte[] buffer, int offset, int length, Stream dest)
{
var bytes = stream.readBytes(buffer, offset, length);
var len = bytes.Length;
dest.Write(bytes, 0, len);
return len == offset + length;
}
private static async Task<bool> ReadBytesAsync(this Stream stream, byte[] buffer, int offset, int length, Stream dest)
{
var bytes = await stream.ReadBytesAsync(buffer, offset, length).ConfigureAwait(false);
@ -141,16 +110,16 @@ namespace SocketHttpListener
#region Internal Methods
internal static byte[] Append(this ushort code, string reason)
internal static async Task<byte[]> AppendAsync(this ushort code, string reason)
{
using (var buffer = new MemoryStream())
{
var tmp = code.ToByteArrayInternally(ByteOrder.Big);
buffer.Write(tmp, 0, 2);
await buffer.WriteAsync(tmp, 0, 2).ConfigureAwait(false);
if (reason != null && reason.Length > 0)
{
tmp = Encoding.UTF8.GetBytes(reason);
buffer.Write(tmp, 0, tmp.Length);
await buffer.WriteAsync(tmp, 0, tmp.Length).ConfigureAwait(false);
}
return buffer.ToArray();

View file

@ -36,6 +36,25 @@ namespace SocketHttpListener.Net
HttpEndPointManager.AddPrefix(_logger, uriPrefix, listener);
}
public void AddRange(IEnumerable<string> uriPrefixes)
{
listener.CheckDisposed();
//ListenerPrefix.CheckUri(uriPrefix);
foreach (var uriPrefix in uriPrefixes)
{
if (prefixes.Contains(uriPrefix))
{
continue;
}
prefixes.Add(uriPrefix);
if (listener.IsListening)
{
HttpEndPointManager.AddPrefix(_logger, uriPrefix, listener);
}
}
}
public void Clear()
{
listener.CheckDisposed();

View file

@ -30,9 +30,9 @@ namespace SocketHttpListener
private CookieCollection _cookies;
private AutoResetEvent _exitReceiving;
private object _forConn;
private object _forEvent;
private readonly SemaphoreSlim _forEvent = new SemaphoreSlim(1, 1);
private object _forMessageEventQueue;
private object _forSend;
private readonly SemaphoreSlim _forSend = new SemaphoreSlim(1, 1);
private const string _guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
private Queue<MessageEventArgs> _messageEventQueue;
private string _protocol;
@ -109,12 +109,15 @@ namespace SocketHttpListener
#region Private Methods
private void close(CloseStatusCode code, string reason, bool wait)
private async Task CloseAsync(CloseStatusCode code, string reason, bool wait)
{
close(new PayloadData(((ushort)code).Append(reason)), !code.IsReserved(), wait);
await CloseAsync(new PayloadData(
await ((ushort)code).AppendAsync(reason).ConfigureAwait(false)),
!code.IsReserved(),
wait).ConfigureAwait(false);
}
private void close(PayloadData payload, bool send, bool wait)
private async Task CloseAsync(PayloadData payload, bool send, bool wait)
{
lock (_forConn)
{
@ -126,11 +129,12 @@ namespace SocketHttpListener
_readyState = WebSocketState.CloseSent;
}
var e = new CloseEventArgs(payload);
e.WasClean =
closeHandshake(
var e = new CloseEventArgs(payload)
{
WasClean = await CloseHandshakeAsync(
send ? WebSocketFrame.CreateCloseFrame(Mask.Unmask, payload).ToByteArray() : null,
wait ? 1000 : 0);
wait ? 1000 : 0).ConfigureAwait(false)
};
_readyState = WebSocketState.Closed;
try
@ -143,9 +147,9 @@ namespace SocketHttpListener
}
}
private bool closeHandshake(byte[] frameAsBytes, int millisecondsTimeout)
private async Task<bool> CloseHandshakeAsync(byte[] frameAsBytes, int millisecondsTimeout)
{
var sent = frameAsBytes != null && writeBytes(frameAsBytes);
var sent = frameAsBytes != null && await WriteBytesAsync(frameAsBytes).ConfigureAwait(false);
var received =
millisecondsTimeout == 0 ||
(sent && _exitReceiving != null && _exitReceiving.WaitOne(millisecondsTimeout));
@ -221,7 +225,7 @@ namespace SocketHttpListener
// CLOSE
if (frame.IsClose)
return processCloseFrame(frame);
return await ProcessCloseFrameAsync(frame).ConfigureAwait(false);
}
else
{
@ -236,10 +240,10 @@ namespace SocketHttpListener
}
// ?
return processUnsupportedFrame(
return await ProcessUnsupportedFrameAsync(
frame,
CloseStatusCode.IncorrectData,
"An incorrect data has been received while receiving fragmented data.");
"An incorrect data has been received while receiving fragmented data.").ConfigureAwait(false);
}
return true;
@ -299,44 +303,42 @@ namespace SocketHttpListener
_compression = CompressionMethod.None;
_cookies = new CookieCollection();
_forConn = new object();
_forEvent = new object();
_forSend = new object();
_messageEventQueue = new Queue<MessageEventArgs>();
_forMessageEventQueue = ((ICollection)_messageEventQueue).SyncRoot;
_readyState = WebSocketState.Connecting;
}
private void open()
private async Task OpenAsync()
{
try
{
startReceiving();
lock (_forEvent)
{
try
{
if (OnOpen != null)
{
OnOpen(this, EventArgs.Empty);
}
}
catch (Exception ex)
{
processException(ex, "An exception has occurred while OnOpen.");
}
}
}
catch (Exception ex)
{
processException(ex, "An exception has occurred while opening.");
await ProcessExceptionAsync(ex, "An exception has occurred while opening.").ConfigureAwait(false);
}
await _forEvent.WaitAsync().ConfigureAwait(false);
try
{
OnOpen?.Invoke(this, EventArgs.Empty);
}
catch (Exception ex)
{
await ProcessExceptionAsync(ex, "An exception has occurred while OnOpen.").ConfigureAwait(false);
}
finally
{
_forEvent.Release();
}
}
private bool processCloseFrame(WebSocketFrame frame)
private async Task<bool> ProcessCloseFrameAsync(WebSocketFrame frame)
{
var payload = frame.PayloadData;
close(payload, !payload.ContainsReservedCloseStatusCode, false);
await CloseAsync(payload, !payload.ContainsReservedCloseStatusCode, false).ConfigureAwait(false);
return false;
}
@ -352,7 +354,7 @@ namespace SocketHttpListener
return true;
}
private void processException(Exception exception, string message)
private async Task ProcessExceptionAsync(Exception exception, string message)
{
var code = CloseStatusCode.Abnormal;
var reason = message;
@ -365,9 +367,13 @@ namespace SocketHttpListener
error(message ?? code.GetMessage(), exception);
if (_readyState == WebSocketState.Connecting)
Close(HttpStatusCode.BadRequest);
{
await CloseAsync(HttpStatusCode.BadRequest).ConfigureAwait(false);
}
else
close(code, reason ?? code.GetMessage(), false);
{
await CloseAsync(code, reason ?? code.GetMessage(), false).ConfigureAwait(false);
}
}
private Task<bool> ProcessFragmentedFrameAsync(WebSocketFrame frame)
@ -414,36 +420,37 @@ namespace SocketHttpListener
return true;
}
private bool processUnsupportedFrame(WebSocketFrame frame, CloseStatusCode code, string reason)
private async Task<bool> ProcessUnsupportedFrameAsync(WebSocketFrame frame, CloseStatusCode code, string reason)
{
processException(new WebSocketException(code, reason), null);
await ProcessExceptionAsync(new WebSocketException(code, reason), null).ConfigureAwait(false);
return false;
}
private async Task<bool> ProcessWebSocketFrameAsync(WebSocketFrame frame)
private Task<bool> ProcessWebSocketFrameAsync(WebSocketFrame frame)
{
return frame.IsCompressed && _compression == CompressionMethod.None
? processUnsupportedFrame(
? ProcessUnsupportedFrameAsync(
frame,
CloseStatusCode.IncorrectData,
"A compressed data has been received without available decompression method.")
: frame.IsFragmented
? await ProcessFragmentedFrameAsync(frame).ConfigureAwait(false)
? ProcessFragmentedFrameAsync(frame)
: frame.IsData
? processDataFrame(frame)
? Task.FromResult(processDataFrame(frame))
: frame.IsPing
? processPingFrame(frame)
? Task.FromResult(processPingFrame(frame))
: frame.IsPong
? processPongFrame(frame)
? Task.FromResult(processPongFrame(frame))
: frame.IsClose
? processCloseFrame(frame)
: processUnsupportedFrame(frame, CloseStatusCode.PolicyViolation, null);
? ProcessCloseFrameAsync(frame)
: ProcessUnsupportedFrameAsync(frame, CloseStatusCode.PolicyViolation, null);
}
private bool send(Opcode opcode, Stream stream)
private async Task<bool> SendAsync(Opcode opcode, Stream stream)
{
lock (_forSend)
await _forSend.WaitAsync().ConfigureAwait(false);
try
{
var src = stream;
var compressed = false;
@ -456,7 +463,7 @@ namespace SocketHttpListener
compressed = true;
}
sent = send(opcode, Mask.Unmask, stream, compressed);
sent = await SendAsync(opcode, Mask.Unmask, stream, compressed).ConfigureAwait(false);
if (!sent)
error("Sending a data has been interrupted.");
}
@ -474,16 +481,20 @@ namespace SocketHttpListener
return sent;
}
finally
{
_forSend.Release();
}
}
private bool send(Opcode opcode, Mask mask, Stream stream, bool compressed)
private async Task<bool> SendAsync(Opcode opcode, Mask mask, Stream stream, bool compressed)
{
var len = stream.Length;
/* Not fragmented */
if (len == 0)
return send(Fin.Final, opcode, mask, new byte[0], compressed);
return await SendAsync(Fin.Final, opcode, mask, new byte[0], compressed).ConfigureAwait(false);
var quo = len / FragmentLength;
var rem = (int)(len % FragmentLength);
@ -492,26 +503,26 @@ namespace SocketHttpListener
if (quo == 0)
{
buff = new byte[rem];
return stream.Read(buff, 0, rem) == rem &&
send(Fin.Final, opcode, mask, buff, compressed);
return await stream.ReadAsync(buff, 0, rem).ConfigureAwait(false) == rem &&
await SendAsync(Fin.Final, opcode, mask, buff, compressed).ConfigureAwait(false);
}
buff = new byte[FragmentLength];
if (quo == 1 && rem == 0)
return stream.Read(buff, 0, FragmentLength) == FragmentLength &&
send(Fin.Final, opcode, mask, buff, compressed);
return await stream.ReadAsync(buff, 0, FragmentLength).ConfigureAwait(false) == FragmentLength &&
await SendAsync(Fin.Final, opcode, mask, buff, compressed).ConfigureAwait(false);
/* Send fragmented */
// Begin
if (stream.Read(buff, 0, FragmentLength) != FragmentLength ||
!send(Fin.More, opcode, mask, buff, compressed))
if (await stream.ReadAsync(buff, 0, FragmentLength).ConfigureAwait(false) != FragmentLength ||
!await SendAsync(Fin.More, opcode, mask, buff, compressed).ConfigureAwait(false))
return false;
var n = rem == 0 ? quo - 2 : quo - 1;
for (long i = 0; i < n; i++)
if (stream.Read(buff, 0, FragmentLength) != FragmentLength ||
!send(Fin.More, Opcode.Cont, mask, buff, compressed))
if (await stream.ReadAsync(buff, 0, FragmentLength).ConfigureAwait(false) != FragmentLength ||
!await SendAsync(Fin.More, Opcode.Cont, mask, buff, compressed).ConfigureAwait(false))
return false;
// End
@ -520,47 +531,27 @@ namespace SocketHttpListener
else
buff = new byte[rem];
return stream.Read(buff, 0, rem) == rem &&
send(Fin.Final, Opcode.Cont, mask, buff, compressed);
return await stream.ReadAsync(buff, 0, rem).ConfigureAwait(false) == rem &&
await SendAsync(Fin.Final, Opcode.Cont, mask, buff, compressed).ConfigureAwait(false);
}
private bool send(Fin fin, Opcode opcode, Mask mask, byte[] data, bool compressed)
private Task<bool> SendAsync(Fin fin, Opcode opcode, Mask mask, byte[] data, bool compressed)
{
lock (_forConn)
{
if (_readyState != WebSocketState.Open)
{
return false;
return Task.FromResult(false);
}
return writeBytes(
return WriteBytesAsync(
WebSocketFrame.CreateWebSocketFrame(fin, opcode, mask, data, compressed).ToByteArray());
}
}
private Task sendAsync(Opcode opcode, Stream stream)
{
var completionSource = new TaskCompletionSource<bool>();
Task.Run(() =>
{
try
{
send(opcode, stream);
completionSource.TrySetResult(true);
}
catch (Exception ex)
{
completionSource.TrySetException(ex);
}
});
return completionSource.Task;
}
// As server
private bool sendHttpResponse(HttpResponse response)
{
return writeBytes(response.ToByteArray());
}
private Task<bool> SendHttpResponseAsync(HttpResponse response)
=> WriteBytesAsync(response.ToByteArray());
private void startReceiving()
{
@ -583,37 +574,45 @@ namespace SocketHttpListener
receive();
if (!frame.IsData)
return;
lock (_forEvent)
{
try
return;
}
await _forEvent.WaitAsync().ConfigureAwait(false);
try
{
var e = dequeueFromMessageEventQueue();
if (e != null && _readyState == WebSocketState.Open)
{
var e = dequeueFromMessageEventQueue();
if (e != null && _readyState == WebSocketState.Open)
OnMessage.Emit(this, e);
}
catch (Exception ex)
{
processException(ex, "An exception has occurred while OnMessage.");
OnMessage.Emit(this, e);
}
}
catch (Exception ex)
{
await ProcessExceptionAsync(ex, "An exception has occurred while OnMessage.").ConfigureAwait(false);
}
finally
{
_forEvent.Release();
}
}
else if (_exitReceiving != null)
{
_exitReceiving.Set();
}
},
ex => processException(ex, "An exception has occurred while receiving a message."));
async ex => await ProcessExceptionAsync(ex, "An exception has occurred while receiving a message.")).ConfigureAwait(false);
receive();
}
private bool writeBytes(byte[] data)
private async Task<bool> WriteBytesAsync(byte[] data)
{
try
{
_stream.Write(data, 0, data.Length);
await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
return true;
}
catch (Exception)
@ -627,10 +626,10 @@ namespace SocketHttpListener
#region Internal Methods
// As server
internal void Close(HttpResponse response)
internal async Task CloseAsync(HttpResponse response)
{
_readyState = WebSocketState.CloseSent;
sendHttpResponse(response);
await SendHttpResponseAsync(response).ConfigureAwait(false);
closeServerResources();
@ -638,22 +637,20 @@ namespace SocketHttpListener
}
// As server
internal void Close(HttpStatusCode code)
{
Close(createHandshakeCloseResponse(code));
}
internal Task CloseAsync(HttpStatusCode code)
=> CloseAsync(createHandshakeCloseResponse(code));
// As server
public void ConnectAsServer()
public async Task ConnectAsServer()
{
try
{
_readyState = WebSocketState.Open;
open();
await OpenAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
processException(ex, "An exception has occurred while connecting.");
await ProcessExceptionAsync(ex, "An exception has occurred while connecting.").ConfigureAwait(false);
}
}
@ -664,18 +661,18 @@ namespace SocketHttpListener
/// <summary>
/// Closes the WebSocket connection, and releases all associated resources.
/// </summary>
public void Close()
public Task CloseAsync()
{
var msg = _readyState.CheckIfClosable();
if (msg != null)
{
error(msg);
return;
return Task.CompletedTask;
}
var send = _readyState == WebSocketState.Open;
close(new PayloadData(), send, send);
return CloseAsync(new PayloadData(), send, send);
}
/// <summary>
@ -693,11 +690,11 @@ namespace SocketHttpListener
/// <param name="reason">
/// A <see cref="string"/> that represents the reason for the close.
/// </param>
public void Close(CloseStatusCode code, string reason)
public async Task CloseAsync(CloseStatusCode code, string reason)
{
byte[] data = null;
var msg = _readyState.CheckIfClosable() ??
(data = ((ushort)code).Append(reason)).CheckIfValidControlData("reason");
(data = await ((ushort)code).AppendAsync(reason).ConfigureAwait(false)).CheckIfValidControlData("reason");
if (msg != null)
{
@ -707,7 +704,7 @@ namespace SocketHttpListener
}
var send = _readyState == WebSocketState.Open && !code.IsReserved();
close(new PayloadData(data), send, send);
await CloseAsync(new PayloadData(data), send, send).ConfigureAwait(false);
}
/// <summary>
@ -732,7 +729,7 @@ namespace SocketHttpListener
throw new Exception(msg);
}
return sendAsync(Opcode.Binary, new MemoryStream(data));
return SendAsync(Opcode.Binary, new MemoryStream(data));
}
/// <summary>
@ -757,7 +754,7 @@ namespace SocketHttpListener
throw new Exception(msg);
}
return sendAsync(Opcode.Text, new MemoryStream(Encoding.UTF8.GetBytes(data)));
return SendAsync(Opcode.Text, new MemoryStream(Encoding.UTF8.GetBytes(data)));
}
#endregion
@ -772,7 +769,7 @@ namespace SocketHttpListener
/// </remarks>
void IDisposable.Dispose()
{
Close(CloseStatusCode.Away, null);
CloseAsync(CloseStatusCode.Away, null).GetAwaiter().GetResult();
}
#endregion

View file

@ -303,10 +303,10 @@ namespace SocketHttpListener
return new WebSocketFrame(Opcode.Close, mask, payload);
}
internal static WebSocketFrame CreateCloseFrame(Mask mask, CloseStatusCode code, string reason)
internal static async Task<WebSocketFrame> CreateCloseFrameAsync(Mask mask, CloseStatusCode code, string reason)
{
return new WebSocketFrame(
Opcode.Close, mask, new PayloadData(((ushort)code).Append(reason)));
Opcode.Close, mask, new PayloadData(await ((ushort)code).AppendAsync(reason).ConfigureAwait(false)));
}
internal static WebSocketFrame CreatePingFrame(Mask mask)