diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs index b0c6d0aa08..7a316b070c 100644 --- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs +++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs @@ -1,6 +1,5 @@ -using System.Collections.Generic; using System; -using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Net.WebSockets; using System.Threading; @@ -26,9 +25,9 @@ namespace Emby.Server.Implementations.Session public readonly int WebSocketLostTimeout = 60; /// - /// The keep-alive timer factor; controls how often the timer will check on the status of the WebSockets. + /// The keep-alive interval factor; controls how often the watcher will check on the status of the WebSockets. /// - public readonly double TimerFactor = 0.2; + public readonly double IntervalFactor = 0.2; /// /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent. @@ -53,14 +52,24 @@ namespace Emby.Server.Implementations.Session private readonly IHttpServer _httpServer; /// - /// The KeepAlive timer. + /// The KeepAlive cancellation token. /// - private Timer _keepAliveTimer; + private CancellationTokenSource _keepAliveCancellationToken; + + /// + /// Lock used for accesing the KeepAlive cancellation token. + /// + private readonly object _keepAliveLock = new object(); /// /// The WebSocket watchlist. /// - private readonly ConcurrentDictionary _webSockets = new ConcurrentDictionary(); + private readonly HashSet _webSockets = new HashSet(); + + /// + /// Lock used for accesing the WebSockets watchlist. + /// + private readonly object _webSocketsLock = new object(); /// /// Initializes a new instance of the class. @@ -113,7 +122,7 @@ namespace Emby.Server.Implementations.Session public void Dispose() { _httpServer.WebSocketConnected -= _serverManager_WebSocketConnected; - StopKeepAliveTimer(); + StopKeepAlive(); } /// @@ -140,6 +149,7 @@ namespace Emby.Server.Implementations.Session private void OnWebSocketClosed(object sender, EventArgs e) { var webSocket = (IWebSocketConnection) sender; + _logger.LogDebug("WebSockets {0} closed.", webSocket); RemoveWebSocket(webSocket); } @@ -147,15 +157,20 @@ namespace Emby.Server.Implementations.Session /// Adds a WebSocket to the KeepAlive watchlist. /// /// The WebSocket to monitor. - private async void KeepAliveWebSocket(IWebSocketConnection webSocket) + private async Task KeepAliveWebSocket(IWebSocketConnection webSocket) { - if (!_webSockets.TryAdd(webSocket, 0)) + lock (_webSocketsLock) { - _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket); - return; + if (!_webSockets.Add(webSocket)) + { + _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket); + return; + } + webSocket.Closed += OnWebSocketClosed; + webSocket.LastKeepAliveDate = DateTime.UtcNow; + + StartKeepAlive(); } - webSocket.Closed += OnWebSocketClosed; - webSocket.LastKeepAliveDate = DateTime.UtcNow; // Notify WebSocket about timeout try @@ -164,10 +179,8 @@ namespace Emby.Server.Implementations.Session } catch (WebSocketException exception) { - _logger.LogWarning(exception, "Error sending ForceKeepAlive message to WebSocket."); + _logger.LogWarning(exception, "Error sending ForceKeepAlive message to WebSocket {0}.", webSocket); } - - StartKeepAliveTimer(); } /// @@ -176,88 +189,131 @@ namespace Emby.Server.Implementations.Session /// The WebSocket to remove. private void RemoveWebSocket(IWebSocketConnection webSocket) { - webSocket.Closed -= OnWebSocketClosed; - _webSockets.TryRemove(webSocket, out _); - } - - /// - /// Starts the KeepAlive timer. - /// - private void StartKeepAliveTimer() - { - if (_keepAliveTimer == null) + lock (_webSocketsLock) { - _keepAliveTimer = new Timer( - KeepAliveSockets, - null, - TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor), - TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor) - ); + if (!_webSockets.Remove(webSocket)) + { + _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket); + } + else + { + webSocket.Closed -= OnWebSocketClosed; + } } } /// - /// Stops the KeepAlive timer. + /// Starts the KeepAlive watcher. /// - private void StopKeepAliveTimer() + private void StartKeepAlive() { - if (_keepAliveTimer != null) + lock (_keepAliveLock) { - _keepAliveTimer.Dispose(); - _keepAliveTimer = null; - } - - foreach (var pair in _webSockets) - { - pair.Key.Closed -= OnWebSocketClosed; + if (_keepAliveCancellationToken == null) + { + _keepAliveCancellationToken = new CancellationTokenSource(); + // Start KeepAlive watcher + KeepAliveSockets( + TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor), + _keepAliveCancellationToken.Token); + } } } /// - /// Checks status of KeepAlive of WebSockets. + /// Stops the KeepAlive watcher. /// - /// The state. - private async void KeepAliveSockets(object state) + private void StopKeepAlive() { - var inactive = _webSockets.Keys.Where(i => + lock (_keepAliveLock) { - var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds; - return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout); - }); - var lost = _webSockets.Keys.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout); - - if (inactive.Any()) - { - _logger.LogDebug("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count()); + if (_keepAliveCancellationToken != null) + { + _keepAliveCancellationToken.Cancel(); + _keepAliveCancellationToken = null; + } } - foreach (var webSocket in inactive) + lock (_webSocketsLock) { + foreach (var webSocket in _webSockets) + { + webSocket.Closed -= OnWebSocketClosed; + } + _webSockets.Clear(); + } + } + + /// + /// Checks status of KeepAlive of WebSockets once every the specified interval time. + /// + /// The interval. + /// The cancellation token. + private async Task KeepAliveSockets(TimeSpan interval, CancellationToken cancellationToken) + { + while (true) + { + _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count()); + + IEnumerable inactive; + IEnumerable lost; + lock (_webSocketsLock) + { + inactive = _webSockets.Where(i => + { + var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds; + return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout); + }); + lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout); + } + + if (inactive.Any()) + { + _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count()); + } + + foreach (var webSocket in inactive) + { + try + { + await SendForceKeepAlive(webSocket); + } + catch (WebSocketException exception) + { + _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket."); + lost = lost.Append(webSocket); + } + } + + lock (_webSocketsLock) + { + if (lost.Any()) + { + _logger.LogInformation("Lost {0} WebSockets.", lost.Count()); + foreach (var webSocket in lost.ToList()) + { + // TODO: handle session relative to the lost webSocket + RemoveWebSocket(webSocket); + } + } + + if (!_webSockets.Any()) + { + StopKeepAlive(); + } + } + + // Wait for next interval + Task task = Task.Delay(interval, cancellationToken); try { - await SendForceKeepAlive(webSocket); + await task; } - catch (WebSocketException exception) + catch (TaskCanceledException) { - _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket."); - lost.Append(webSocket); + return; } } - - if (lost.Any()) - { - _logger.LogInformation("Lost {0} WebSockets.", lost.Count()); - foreach (var webSocket in lost) - { - // TODO: handle session relative to the lost webSocket - RemoveWebSocket(webSocket); - } - } - - if (!_webSockets.Any()) - { - StopKeepAliveTimer(); - } } /// diff --git a/Emby.Server.Implementations/Syncplay/SyncplayManager.cs b/Emby.Server.Implementations/Syncplay/SyncplayManager.cs index 5aefd1fd98..eb61da7f32 100644 --- a/Emby.Server.Implementations/Syncplay/SyncplayManager.cs +++ b/Emby.Server.Implementations/Syncplay/SyncplayManager.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; using System.Linq; @@ -42,14 +41,19 @@ namespace Emby.Server.Implementations.Syncplay /// /// The map between sessions and groups. /// - private readonly ConcurrentDictionary _sessionToGroupMap = - new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _sessionToGroupMap = + new Dictionary(StringComparer.OrdinalIgnoreCase); /// /// The groups. /// - private readonly ConcurrentDictionary _groups = - new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _groups = + new Dictionary(StringComparer.OrdinalIgnoreCase); + + /// + /// Lock used for accesing any group. + /// + private readonly object _groupsLock = new object(); private bool _disposed = false; @@ -175,15 +179,18 @@ namespace Emby.Server.Implementations.Syncplay return; } - if (IsSessionInGroup(session)) + lock (_groupsLock) { - LeaveGroup(session); + if (IsSessionInGroup(session)) + { + LeaveGroup(session); + } + + var group = new SyncplayController(_logger, _sessionManager, this); + _groups[group.GetGroupId().ToString()] = group; + + group.InitGroup(session); } - - var group = new SyncplayController(_logger, _sessionManager, this); - _groups[group.GetGroupId().ToString()] = group; - - group.InitGroup(session); } /// @@ -203,67 +210,73 @@ namespace Emby.Server.Implementations.Syncplay return; } - ISyncplayController group; - _groups.TryGetValue(groupId, out group); - - if (group == null) + lock (_groupsLock) { - _logger.LogWarning("Syncplaymanager JoinGroup: {0} tried to join group {0} that does not exist.", session.Id, groupId); + ISyncplayController group; + _groups.TryGetValue(groupId, out group); - var error = new GroupUpdate() + if (group == null) { - Type = GroupUpdateType.GroupNotJoined - }; - _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); - return; - } + _logger.LogWarning("Syncplaymanager JoinGroup: {0} tried to join group {0} that does not exist.", session.Id, groupId); - if (!HasAccessToItem(user, group.GetPlayingItemId())) - { - _logger.LogWarning("Syncplaymanager JoinGroup: {0} does not have access to {1}.", session.Id, group.GetPlayingItemId()); + var error = new GroupUpdate() + { + Type = GroupUpdateType.GroupNotJoined + }; + _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); + return; + } - var error = new GroupUpdate() + if (!HasAccessToItem(user, group.GetPlayingItemId())) { - GroupId = group.GetGroupId().ToString(), - Type = GroupUpdateType.LibraryAccessDenied - }; - _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); - return; - } + _logger.LogWarning("Syncplaymanager JoinGroup: {0} does not have access to {1}.", session.Id, group.GetPlayingItemId()); - if (IsSessionInGroup(session)) - { - if (GetSessionGroup(session).Equals(groupId)) return; - LeaveGroup(session); - } + var error = new GroupUpdate() + { + GroupId = group.GetGroupId().ToString(), + Type = GroupUpdateType.LibraryAccessDenied + }; + _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); + return; + } - group.SessionJoin(session, request); + if (IsSessionInGroup(session)) + { + if (GetSessionGroup(session).Equals(groupId)) return; + LeaveGroup(session); + } + + group.SessionJoin(session, request); + } } /// public void LeaveGroup(SessionInfo session) { // TODO: determine what happens to users that are in a group and get their permissions revoked - - ISyncplayController group; - _sessionToGroupMap.TryGetValue(session.Id, out group); - - if (group == null) + lock (_groupsLock) { - _logger.LogWarning("Syncplaymanager LeaveGroup: {0} does not belong to any group.", session.Id); + ISyncplayController group; + _sessionToGroupMap.TryGetValue(session.Id, out group); - var error = new GroupUpdate() + if (group == null) { - Type = GroupUpdateType.NotInGroup - }; - _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); - return; - } - group.SessionLeave(session); + _logger.LogWarning("Syncplaymanager LeaveGroup: {0} does not belong to any group.", session.Id); - if (group.IsGroupEmpty()) - { - _groups.Remove(group.GetGroupId().ToString(), out _); + var error = new GroupUpdate() + { + Type = GroupUpdateType.NotInGroup + }; + _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); + return; + } + + group.SessionLeave(session); + + if (group.IsGroupEmpty()) + { + _groups.Remove(group.GetGroupId().ToString(), out _); + } } } @@ -314,21 +327,25 @@ namespace Emby.Server.Implementations.Syncplay return; } - ISyncplayController group; - _sessionToGroupMap.TryGetValue(session.Id, out group); - - if (group == null) + lock (_groupsLock) { - _logger.LogWarning("Syncplaymanager HandleRequest: {0} does not belong to any group.", session.Id); + ISyncplayController group; + _sessionToGroupMap.TryGetValue(session.Id, out group); - var error = new GroupUpdate() + if (group == null) { - Type = GroupUpdateType.NotInGroup - }; - _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); - return; + _logger.LogWarning("Syncplaymanager HandleRequest: {0} does not belong to any group.", session.Id); + + var error = new GroupUpdate() + { + Type = GroupUpdateType.NotInGroup + }; + _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), error, CancellationToken.None); + return; + } + + group.HandleRequest(session, request); } - group.HandleRequest(session, request); } /// diff --git a/MediaBrowser.Api/Syncplay/TimeSyncService.cs b/MediaBrowser.Api/Syncplay/TimeSyncService.cs index 8974130157..930968d9fc 100644 --- a/MediaBrowser.Api/Syncplay/TimeSyncService.cs +++ b/MediaBrowser.Api/Syncplay/TimeSyncService.cs @@ -9,7 +9,6 @@ using Microsoft.Extensions.Logging; namespace MediaBrowser.Api.Syncplay { [Route("/GetUtcTime", "GET", Summary = "Get UtcTime")] - [Authenticated] public class GetUtcTime : IReturnVoid { // Nothing @@ -33,13 +32,10 @@ namespace MediaBrowser.Api.Syncplay public TimeSyncService( ILogger logger, IServerConfigurationManager serverConfigurationManager, - IHttpResultFactory httpResultFactory, - ISessionManager sessionManager, - ISessionContext sessionContext) + IHttpResultFactory httpResultFactory) : base(logger, serverConfigurationManager, httpResultFactory) { - _sessionManager = sessionManager; - _sessionContext = sessionContext; + // Do nothing } ///