Winify – Rev 84

Subversion Repositories:
Rev:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Drawing;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Newtonsoft.Json;
using Serilog;
using Servers;
using WebSocketSharp;
using WebSocketSharp.Net;
using Winify.Utilities;
using ErrorEventArgs = WebSocketSharp.ErrorEventArgs;
using NetworkCredential = System.Net.NetworkCredential;

namespace Winify.Gotify
{
    public class GotifyConnection : IDisposable
    {
        #region Public Events & Delegates

        public event EventHandler<GotifyMessageEventArgs> GotifyMessage;

        #endregion

        #region Private Delegates, Events, Enums, Properties, Indexers and Fields

        private readonly Server _server;

        private CancellationToken _cancellationToken;

        private CancellationTokenSource _cancellationTokenSource;

        private Task _heartBeatTask;

        private HttpClient _httpClient;

        private readonly Uri _webSocketsUri;

        private readonly Uri _httpUri;
        private WebSocket _webSocketSharp;
        private readonly Configuration.Configuration _configuration;
        private IDisposable _tplRetrievePastMessagesLink;
        private IDisposable _tplWebSocketsBufferBlockTransformLink;
        private readonly BufferBlock<GotifyConnectionData> _webSocketMessageBufferBlock;
        private readonly Stopwatch _webSocketsClientPingStopWatch;
        private readonly ScheduledContinuation _webSocketsServerResponseScheduledContinuation;
        
        private Task _retrievePastMessagesTask;
        private static JsonSerializer _jsonSerializer;
        private readonly CancellationToken _programCancellationToken;
        private CancellationTokenSource _localCancellationTokenSource;
        private CancellationToken _localCancellationToken;

        #endregion

        #region Constructors, Destructors and Finalizers

        private GotifyConnection()
        {
            _jsonSerializer = new JsonSerializer();
            _webSocketsServerResponseScheduledContinuation = new ScheduledContinuation();
            _webSocketsClientPingStopWatch = new Stopwatch();

            _webSocketMessageBufferBlock = new BufferBlock<GotifyConnectionData>(
                new DataflowBlockOptions
                {
                    CancellationToken = _cancellationToken
                });

            var webSocketActionBlock = new ActionBlock<GotifyConnectionData>(async gotifyConnectionData =>
            {
                try
                {
                    using var memoryStream = new MemoryStream(gotifyConnectionData.Payload);
                    using var streamReader = new StreamReader(memoryStream);
                    using var jsonTextReader = new JsonTextReader(streamReader);

                    var gotifyMessage = _jsonSerializer.Deserialize<GotifyMessage>(jsonTextReader) ?? throw new ArgumentNullException();

                    gotifyMessage.Server = gotifyConnectionData.Server;

                    using var imageStream = await RetrieveGotifyApplicationImage(gotifyMessage.AppId);

                    if (imageStream == null || imageStream.Length == 0)
                    {
                        Log.Warning("Could not find any application image for notification.");

                        return;
                    }

                    var image = new Bitmap(imageStream);

                    GotifyMessage?.Invoke(this,
                        new GotifyMessageEventArgs(gotifyMessage, image));

                    Log.Debug($"Notification message received: {gotifyMessage.Message}");
                }
                catch (JsonSerializationException exception)
                {
                    Log.Warning(exception, "Could not deserialize notification message.");
                }
                catch (Exception exception)
                {
                    Log.Warning(exception, "Generic failure.");
                }

            }, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken });

            _tplWebSocketsBufferBlockTransformLink = _webSocketMessageBufferBlock.LinkTo(webSocketActionBlock,
                new DataflowLinkOptions { PropagateCompletion = true });
        }

        public GotifyConnection(Server server, Configuration.Configuration configuration, CancellationToken cancellationToken) : this()
        {
            _server = server;
            _configuration = configuration;
            _programCancellationToken = cancellationToken;

            ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
            var httpClientHandler = new HttpClientHandler
            {
                // mono does not implement this
                //SslProtocols = SslProtocols.Tls12
            };

            _httpClient = new HttpClient(httpClientHandler);
            if (_configuration.IgnoreSelfSignedCertificates)
                httpClientHandler.ServerCertificateCustomValidationCallback =
                    (httpRequestMessage, cert, cetChain, policyErrors) => true;

            if (_configuration.Proxy.Enable)
                httpClientHandler.Proxy = new WebProxy(_configuration.Proxy.Url, false, new string[] { },
                    new NetworkCredential(_configuration.Proxy.Username, _configuration.Proxy.Password));

            _httpClient = new HttpClient(httpClientHandler);
            if (!string.IsNullOrEmpty(_server.Username) && !string.IsNullOrEmpty(_server.Password))
                _httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic",
                    Convert.ToBase64String(Encoding.Default.GetBytes($"{_server.Username}:{_server.Password}")));

            if (!Uri.TryCreate(_server.Url, UriKind.Absolute, out _httpUri))
            {
                Log.Error($"No HTTP URL could be built out of the supplied server URI {_server.Url}");
                return;
            }

            // Build the web sockets URI.
            var webSocketsUriBuilder = new UriBuilder(_httpUri);
            switch (webSocketsUriBuilder.Scheme.ToUpperInvariant())
            {
                case "HTTP":
                    webSocketsUriBuilder.Scheme = "ws";
                    break;
                case "HTTPS":
                    webSocketsUriBuilder.Scheme = "wss";
                    break;
            }

            if (!Uri.TryCreate(webSocketsUriBuilder.Uri, "stream", out var combinedUri))
            {
                Log.Error($"No WebSockets URL could be built from the provided URL {_server.Url}.");
            }

            _webSocketsUri = combinedUri;
        }

        public void Dispose()
        {
            if (_localCancellationTokenSource != null)
            {
                _localCancellationTokenSource.Dispose();
                _localCancellationTokenSource = null;
            }

            if (_tplWebSocketsBufferBlockTransformLink != null)
            {
                _tplWebSocketsBufferBlockTransformLink.Dispose();
                _tplWebSocketsBufferBlockTransformLink = null;
            }

            if (_tplRetrievePastMessagesLink != null)
            {
                _tplRetrievePastMessagesLink.Dispose();
                _tplRetrievePastMessagesLink = null;
            }

            if (_webSocketSharp != null)
            {
                _webSocketSharp.Close();
                _webSocketSharp = null;
            }

            if (_httpClient != null)
            {
                _httpClient.Dispose();
                _httpClient = null;
            }
        }

        #endregion

        #region Public Methods

        public async Task Stop()
        {
            _localCancellationTokenSource.Cancel();

            if (_heartBeatTask != null)
            {
                await _heartBeatTask;
            }

            if (_retrievePastMessagesTask != null)
            {
                await _retrievePastMessagesTask;
            }

            if (_webSocketSharp != null)
            {
                _webSocketSharp.OnMessage -= WebSocketSharp_OnMessage;
                _webSocketSharp.OnError -= WebSocketSharp_OnError;
                _webSocketSharp.OnOpen -= WebSocketSharp_OnOpen;
                _webSocketSharp.OnClose -= WebSocketSharp_OnClose;

                _webSocketSharp.Close();
                _webSocketSharp = null;
            }
        }

        public void Start()
        {
            if (_webSocketsUri == null || _httpUri == null)
            {
                Log.Error("Could not start connection to server due to unreadable URLs");
                return;
            }


            _localCancellationTokenSource = new CancellationTokenSource();
            _localCancellationToken = _localCancellationTokenSource.Token;

            _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(new [] { _programCancellationToken, _localCancellationToken });
            _cancellationToken = _cancellationTokenSource.Token;

            if (_webSocketSharp != null)
            {
                return;
            }

            _webSocketSharp = new WebSocket(_webSocketsUri.AbsoluteUri);
            _webSocketSharp.EmitOnPing = true;
            _webSocketSharp.WaitTime = TimeSpan.FromMinutes(1);
            _webSocketSharp.SslConfiguration = new ClientSslConfiguration(_webSocketsUri.Host,
                new X509CertificateCollection(new X509Certificate[] { }), SslProtocols.Tls12, false);

            if (_configuration.Proxy.Enable)
            {
                if (!string.IsNullOrEmpty(_configuration.Proxy.Url))
                {
                    _webSocketSharp.SetProxy(_configuration.Proxy.Url, _configuration.Proxy.Username, _configuration.Proxy.Password);
                }
            }

            if (!string.IsNullOrEmpty(_server.Username) && !string.IsNullOrEmpty(_server.Password))
            {
                _webSocketSharp.SetCredentials(_server.Username, _server.Password, true);
            }

            if (_configuration.IgnoreSelfSignedCertificates)
            {
                _webSocketSharp.SslConfiguration.ServerCertificateValidationCallback +=
                    (sender, certificate, chain, errors) => true;
            }

            _webSocketSharp.Log.Output = (logData, s) =>
            {
                Log.Information($"WebSockets low level logging reported: {logData.Message}");
            };

            _webSocketSharp.OnMessage += WebSocketSharp_OnMessage;
            _webSocketSharp.OnError += WebSocketSharp_OnError;
            _webSocketSharp.OnOpen += WebSocketSharp_OnOpen;
            _webSocketSharp.OnClose += WebSocketSharp_OnClose;

            _webSocketSharp.Connect();
            _heartBeatTask = HeartBeat(_cancellationToken);

            if (_configuration.RetrievePastNotificationHours != 0)
            {
                _retrievePastMessagesTask = RetrievePastMessages(_cancellationToken);
            }
        }

        private async void WebSocketSharp_OnClose(object sender, CloseEventArgs e)
        {
            Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} closed with reason {e.Reason}");

            try
            {
                await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken);

                Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");

                await Stop().ContinueWith(task => Start(), _programCancellationToken);
            }
            catch (Exception exception)
            {
                Log.Error(exception, "Error restarting WebSockets connection on connection closed.");
            }
        }

        private void WebSocketSharp_OnOpen(object sender, EventArgs e)
        {
            Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} is now open");

            _webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken);
        }

        private async void OnServerNotResponding()
        {
            Log.Warning($"Server {_server.Name} has not responded in a long while...");

            try
            {
                await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken);

                Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");

                await Stop().ContinueWith(task => Start(), _programCancellationToken);
            }
            catch (Exception exception)
            {
                Log.Error(exception, "Error restarting WebSockets connection on connection closed.");
            }
        }

        private async void WebSocketSharp_OnError(object sender, ErrorEventArgs e)
        {
            Log.Error(
                $"Connection to WebSockets server {_webSocketsUri.AbsoluteUri} terminated unexpectedly with message {e.Message}",
                e.Exception);

            try
            {
                await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken);

                Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");

                await Stop().ContinueWith(task => Start(), _programCancellationToken);
            }
            catch (Exception exception)
            {
                Log.Error(exception, "Error restarting WebSockets connection on connection closed.");
            }
        }

        private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e)
        {
            if (e.IsPing)
            {
                Log.Information($"Server {_server.Name} sent PING message");

                _webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken);
                return;
            }

            await _webSocketMessageBufferBlock.SendAsync(new GotifyConnectionData(e.RawData, _server), _cancellationToken);
        }

        #endregion

        #region Private Methods

        private async Task RetrievePastMessages(CancellationToken cancellationToken)
        {
            var gotifyApplicationBufferBlock = new BufferBlock<GotifyConnectionApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken });
            var gotifyApplicationActionBlock = new ActionBlock<GotifyConnectionApplication>(async gotifyConnectionApplication =>
            {
                if (!Uri.TryCreate(_httpUri, $"application/{gotifyConnectionApplication.Application.Id}/message", out var combinedUri))
                {
                    Log.Error($"Could not get application message Uri {gotifyConnectionApplication.Application.Id}.");

                    return;
                }

                try
                {
                    using var messageStream = await _httpClient.GetStreamAsync(combinedUri);
                    using var streamReader = new StreamReader(messageStream);
                    using var jsonTextReader = new JsonTextReader(streamReader);

                    var gotifyMessageQuery = _jsonSerializer.Deserialize<GotifyMessageQuery>(jsonTextReader) ??
                                             throw new ArgumentNullException();

                    if (gotifyMessageQuery.Messages == null)
                    {
                        Log.Warning("Invalid application messages deserialized.");

                        return;
                    }

                    foreach (var message in gotifyMessageQuery.Messages)
                    {
                        if (message.Date < DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours))
                        {
                            continue;
                        }

                        using var imageStream = await RetrieveGotifyApplicationImage(message.AppId);
                        if (imageStream == null || imageStream.Length == 0)
                        {
                            Log.Warning("Could not find any application image for notification.");

                            continue;
                        }

                        var image = new Bitmap(imageStream);
                        message.Server = gotifyConnectionApplication.Server;

                        GotifyMessage?.Invoke(this, new GotifyMessageEventArgs(message, image));
                    }
                }
                catch (HttpRequestException exception)
                {
                    Log.Warning(exception, $"Could not get application {gotifyConnectionApplication.Application.Id}.");
                }
                catch (JsonSerializationException exception)
                {
                    Log.Warning(exception,
                        $"Could not deserialize the message response for application {gotifyConnectionApplication.Application.Id}.");
                }
                catch (Exception exception)
                {
                    Log.Warning(exception, "Generic failure.");
                }
            }, new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });

            using var _1 = gotifyApplicationBufferBlock.LinkTo(gotifyApplicationActionBlock,
                new DataflowLinkOptions { PropagateCompletion = true });

            var tasks = new ConcurrentBag<Task>();
            foreach (var application in await RetrieveGotifyApplications())
            {
                var gotifyConnectionApplication = new GotifyConnectionApplication(_server, application);

                tasks.Add(gotifyApplicationBufferBlock.SendAsync(gotifyConnectionApplication, cancellationToken));
            }

            await Task.WhenAll(tasks);
            gotifyApplicationBufferBlock.Complete();
            await gotifyApplicationActionBlock.Completion;
        }

        private async Task HeartBeat(CancellationToken cancellationToken)
        {
            try
            {
                do
                {
                    await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);

                    _webSocketsClientPingStopWatch.Restart();
                    if (!_webSocketSharp.Ping())
                    {
                        Log.Warning($"Server {_server.Name} did not respond to PING message.");
                        continue;
                    }

                    var delta = _webSocketsClientPingStopWatch.ElapsedMilliseconds;

                    Log.Information($"PING response latency for {_server.Name} is {delta}ms");

                    _webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken);

                } while (!cancellationToken.IsCancellationRequested);
            }
            catch (Exception exception) when (exception is OperationCanceledException ||
                                              exception is ObjectDisposedException)
            {
            }
            catch (Exception exception)
            {
                Log.Warning(exception, $"Heartbeat for server {_server.Name} has failed.");
            }
        }

        private async Task<GotifyApplication[]> RetrieveGotifyApplications()
        {
            if (!Uri.TryCreate(_httpUri, "application", out var combinedUri))
            {
                Log.Error($"No application URL could be built for {_server.Url}.");

                return Array.Empty<GotifyApplication>();
            }
            
            try
            {
                using var messageStream = await _httpClient.GetStreamAsync(combinedUri);
                using var streamReader = new StreamReader(messageStream);
                using var jsonTextReader = new JsonTextReader(streamReader);

                return _jsonSerializer.Deserialize<GotifyApplication[]>(jsonTextReader);
            }
            catch (Exception exception)
            {
                Log.Warning(exception,"Could not deserialize the list of applications from the server.");

                return Array.Empty<GotifyApplication>();
            }
        }

        private async Task<Stream> RetrieveGotifyApplicationImage(int appId)
        {
            var memoryStream = new MemoryStream();

            foreach (var application in await RetrieveGotifyApplications())
            {
                if (application.Id != appId)
                {
                    continue;
                }

                if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute, out var applicationImageUri))
                {
                    Log.Warning("Could not build URL path to application icon");

                    continue;
                }

                try
                {
                    using var imageResponse = await _httpClient.GetStreamAsync(applicationImageUri);

                    await imageResponse.CopyToAsync(memoryStream);

                    return memoryStream;
                }
                catch (Exception exception)
                {
                    Log.Error(exception,"Could not retrieve application image.");
                }
            }

            return memoryStream;
        }

        #endregion
    }
}

Generated by GNU Enscript 1.6.5.90.