using MediaBrowser.Common.Implementations.NetworkManagement; using MediaBrowser.Common.Net; using MediaBrowser.Model.Logging; using System; using System.Net; using System.Net.Sockets; using System.Reactive.Linq; using System.Text; using System.Threading.Tasks; namespace MediaBrowser.Server.Implementations.Udp { /// /// Provides a Udp Server /// public class UdpServer : IUdpServer { /// /// Occurs when [message received]. /// public event EventHandler MessageReceived; /// /// Gets or sets the logger. /// /// The logger. private ILogger Logger { get; set; } /// /// Initializes a new instance of the class. /// /// The logger. public UdpServer(ILogger logger) { Logger = logger; } /// /// Raises the event. /// /// The instance containing the event data. protected virtual void OnMessageReceived(UdpMessageReceivedEventArgs e) { EventHandler handler = MessageReceived; if (handler != null) handler(this, e); } /// /// The _udp client /// private UdpClient _udpClient; /// /// Starts the specified port. /// /// The port. public void Start(int port) { _udpClient = new UdpClient(new IPEndPoint(IPAddress.Any, port)); _udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); CreateObservable().Subscribe(OnMessageReceived); } /// /// Creates the observable. /// /// IObservable{UdpReceiveResult}. private IObservable CreateObservable() { return Observable.Create(obs => Observable.FromAsync(() => { try { return _udpClient.ReceiveAsync(); } catch (ObjectDisposedException) { return Task.FromResult(new UdpReceiveResult(new byte[]{}, new IPEndPoint(IPAddress.Any, 0))); } catch (Exception ex) { Logger.ErrorException("Error receiving udp message", ex); return Task.FromResult(new UdpReceiveResult(new byte[] { }, new IPEndPoint(IPAddress.Any, 0))); } }) .Subscribe(obs)) .Repeat() .Retry() .Publish() .RefCount(); } /// /// Called when [message received]. /// /// The message. private void OnMessageReceived(UdpReceiveResult message) { if (message.RemoteEndPoint.Port == 0) { return; } var bytes = message.Buffer; OnMessageReceived(new UdpMessageReceivedEventArgs { Bytes = bytes, RemoteEndPoint = message.RemoteEndPoint.ToString() }); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Stops this instance. /// public void Stop() { if (_udpClient != null) { _udpClient.Close(); } } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { if (dispose) { Stop(); } } /// /// Sends the async. /// /// The data. /// The ip address. /// The port. /// Task{System.Int32}. /// data public Task SendAsync(string data, string ipAddress, int port) { return SendAsync(Encoding.UTF8.GetBytes(data), ipAddress, port); } /// /// Sends the async. /// /// The bytes. /// The ip address. /// The port. /// Task{System.Int32}. /// bytes public Task SendAsync(byte[] bytes, string ipAddress, int port) { if (bytes == null) { throw new ArgumentNullException("bytes"); } if (string.IsNullOrEmpty(ipAddress)) { throw new ArgumentNullException("ipAddress"); } return _udpClient.SendAsync(bytes, bytes.Length, ipAddress, port); } /// /// Sends the async. /// /// The bytes. /// The remote end point. /// Task. public async Task SendAsync(byte[] bytes, string remoteEndPoint) { if (bytes == null) { throw new ArgumentNullException("bytes"); } if (string.IsNullOrEmpty(remoteEndPoint)) { throw new ArgumentNullException("remoteEndPoint"); } await _udpClient.SendAsync(bytes, bytes.Length, new NetworkManager().Parse(remoteEndPoint)).ConfigureAwait(false); Logger.Info("Udp message sent to {0}", remoteEndPoint); } } }