Winify – Rev 64
?pathlinks?
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Drawing;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.Caching;
using System.Runtime.CompilerServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
using Newtonsoft.Json;
using Serilog;
using Servers;
using WebSocketSharp;
using WebSocketSharp.Net;
using Winify.Utilities;
using ErrorEventArgs = WebSocketSharp.ErrorEventArgs;
using NetworkCredential = System.Net.NetworkCredential;
namespace Winify.Gotify
{
public class GotifyConnection : IDisposable
{
#region Public Events & Delegates
public event EventHandler<GotifyNotificationEventArgs> GotifyNotification;
#endregion
#region Private Delegates, Events, Enums, Properties, Indexers and Fields
private readonly Server _server;
private CancellationToken _cancellationToken;
private CancellationTokenSource _cancellationTokenSource;
private Task _runTask;
private HttpClient _httpClient;
private readonly Uri _webSocketsUri;
private readonly Uri _httpUri;
private WebSocket _webSocketSharp;
private readonly Configuration.Configuration _configuration;
private Task _initTask;
private IDisposable _tplRetrievePastMessagesLink;
private IDisposable _tplWebSocketsBufferBlockTransformLink;
private IDisposable _tplWebSocketsTransformActionLink;
private IDisposable _tplWebSocketsTransformActionNullLink;
private readonly BufferBlock<byte[]> _webSocketMessageBufferBlock;
private readonly Stopwatch _webSocketsClientPingStopWatch;
private readonly ScheduledContinuation _webSocketsServerResponseScheduledContinuation;
private readonly MemoryCache _applicationImageCache;
#endregion
#region Constructors, Destructors and Finalizers
private GotifyConnection()
{
_applicationImageCache = new MemoryCache("GotifyApplicationImageCache");
_webSocketsServerResponseScheduledContinuation = new ScheduledContinuation();
_webSocketsClientPingStopWatch = new Stopwatch();
_webSocketMessageBufferBlock = new BufferBlock<byte[]>(new DataflowBlockOptions { CancellationToken = _cancellationToken });
var webSocketTransformBlock = new TransformBlock<byte[], GotifyMessage>(bytes =>
{
if (bytes.Length == 0)
{
return null;
}
var message = Encoding.UTF8.GetString(bytes, 0, bytes.Length);
GotifyMessage gotifyNotification;
try
{
gotifyNotification = JsonConvert.DeserializeObject<GotifyMessage>(message);
}
catch (JsonSerializationException exception)
{
Log.Warning($"Could not deserialize notification: {exception.Message}");
return null;
}
if (gotifyNotification == null)
{
Log.Warning($"Could not deserialize gotify notification: {message}");
return null;
}
return gotifyNotification;
}, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken });
var webSocketActionBlock = new ActionBlock<GotifyMessage>(async message =>
{
message.Server = _server;
var cachedImage = _applicationImageCache.Get($"{message.AppId}");
if (cachedImage is Stream cachedImageStream)
{
using var cachedImageMemoryStream = new MemoryStream();
await cachedImageStream.CopyToAsync(cachedImageMemoryStream);
var cachedApplicationImage = new Bitmap(cachedImageMemoryStream);
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, cachedApplicationImage));
return;
}
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken);
if (imageStream == null || imageStream.Length == 0)
{
Log.Warning("Could not find any application image for notification");
return;
}
using var memoryStream = new MemoryStream();
await imageStream.CopyToAsync(memoryStream);
var imageBytes = memoryStream.ToArray();
_applicationImageCache.Add($"{message.AppId}", imageBytes,
new CacheItemPolicy
{
SlidingExpiration = TimeSpan.FromHours(1)
});
var image = new Bitmap(memoryStream);
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, image));
Log.Debug($"Notification message received: {message.Message}");
}, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken });
_tplWebSocketsBufferBlockTransformLink = _webSocketMessageBufferBlock.LinkTo(webSocketTransformBlock,
new DataflowLinkOptions { PropagateCompletion = true });
_tplWebSocketsTransformActionLink = webSocketTransformBlock.LinkTo(webSocketActionBlock,
new DataflowLinkOptions { PropagateCompletion = true }, message => message != null);
_tplWebSocketsTransformActionNullLink = webSocketTransformBlock.LinkTo(DataflowBlock.NullTarget<GotifyMessage>(),
new DataflowLinkOptions() { PropagateCompletion = true });
}
public GotifyConnection(Server server, Configuration.Configuration configuration) : this()
{
_server = server;
_configuration = configuration;
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
var httpClientHandler = new HttpClientHandler
{
// mono does not implement this
//SslProtocols = SslProtocols.Tls12
};
_httpClient = new HttpClient(httpClientHandler);
if (_configuration.IgnoreSelfSignedCertificates)
httpClientHandler.ServerCertificateCustomValidationCallback =
(httpRequestMessage, cert, cetChain, policyErrors) => true;
if (_configuration.Proxy.Enable)
httpClientHandler.Proxy = new WebProxy(_configuration.Proxy.Url, false, new string[] { },
new NetworkCredential(_configuration.Proxy.Username, _configuration.Proxy.Password));
_httpClient = new HttpClient(httpClientHandler);
if (!string.IsNullOrEmpty(_server.Username) && !string.IsNullOrEmpty(_server.Password))
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic",
Convert.ToBase64String(Encoding.Default.GetBytes($"{_server.Username}:{_server.Password}")));
if (!Uri.TryCreate(_server.Url, UriKind.Absolute, out _httpUri))
{
Log.Error($"No HTTP URL could be built out of the supplied server URI {_server.Url}");
return;
}
// Build the web sockets URI.
var webSocketsUriBuilder = new UriBuilder(_httpUri);
switch (webSocketsUriBuilder.Scheme.ToUpperInvariant())
{
case "HTTP":
webSocketsUriBuilder.Scheme = "ws";
break;
case "HTTPS":
webSocketsUriBuilder.Scheme = "wss";
break;
}
try
{
webSocketsUriBuilder.Path = Path.Combine(webSocketsUriBuilder.Path, "stream");
}
catch (ArgumentException exception)
{
Log.Error(
$"No WebSockets URL could be built from the provided URL {_server.Url} due to {exception.Message}");
}
_webSocketsUri = webSocketsUriBuilder.Uri;
}
public void Dispose()
{
if (_cancellationTokenSource != null)
{
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;
}
if (_tplWebSocketsBufferBlockTransformLink != null)
{
_tplWebSocketsBufferBlockTransformLink.Dispose();
_tplWebSocketsBufferBlockTransformLink = null;
}
if (_tplWebSocketsTransformActionLink != null)
{
_tplWebSocketsTransformActionLink.Dispose();
_tplWebSocketsTransformActionLink = null;
}
if (_tplWebSocketsTransformActionNullLink != null)
{
_tplWebSocketsTransformActionNullLink.Dispose();
_tplWebSocketsTransformActionNullLink = null;
}
if (_tplRetrievePastMessagesLink != null)
{
_tplRetrievePastMessagesLink.Dispose();
_tplRetrievePastMessagesLink = null;
}
if (_webSocketSharp != null)
{
_webSocketSharp.Close();
_webSocketSharp = null;
}
if (_httpClient != null)
{
_httpClient.Dispose();
_httpClient = null;
}
}
#endregion
#region Public Methods
public void Start()
{
if (_webSocketsUri == null || _httpUri == null)
{
Log.Error("Could not start connection to server due to unreadable URLs");
return;
}
_cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = _cancellationTokenSource.Token;
Connect();
if (_configuration.RetrievePastNotificationHours != 0)
{
_initTask = RetrievePastMessages(_cancellationToken);
}
_runTask = HeartBeat(_cancellationToken);
}
private void Connect()
{
_webSocketSharp = new WebSocket(_webSocketsUri.AbsoluteUri);
_webSocketSharp.EmitOnPing = true;
_webSocketSharp.WaitTime = TimeSpan.FromMinutes(1);
_webSocketSharp.SslConfiguration = new ClientSslConfiguration(_webSocketsUri.Host,
new X509CertificateCollection(new X509Certificate[] { }), SslProtocols.Tls12, false);
if (_configuration.Proxy.Enable)
_webSocketSharp.SetProxy(_configuration.Proxy.Url, _configuration.Proxy.Username,
_configuration.Proxy.Password);
if (!string.IsNullOrEmpty(_server.Username) && !string.IsNullOrEmpty(_server.Password))
_webSocketSharp.SetCredentials(_server.Username, _server.Password, true);
if (_configuration.IgnoreSelfSignedCertificates)
_webSocketSharp.SslConfiguration.ServerCertificateValidationCallback +=
(sender, certificate, chain, errors) => true;
_webSocketSharp.Log.Output = (logData, s) =>
{
Log.Information($"WebSockets low level logging reported: {logData.Message}");
};
_webSocketSharp.OnMessage += WebSocketSharp_OnMessage;
_webSocketSharp.OnError += WebSocketSharp_OnError;
_webSocketSharp.OnOpen += WebSocketSharp_OnOpen;
_webSocketSharp.OnClose += WebSocketSharp_OnClose;
_webSocketSharp.ConnectAsync();
}
private void WebSocketSharp_OnClose(object sender, CloseEventArgs e)
{
Log.Information(
$"WebSockets connection to server {_webSocketsUri.AbsoluteUri} closed with reason {e.Reason}");
}
private void WebSocketSharp_OnOpen(object sender, EventArgs e)
{
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} is now open");
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
}
private void OnUnresponsiveServer()
{
Log.Warning($"Server {_server} has not responded in a long while...");
}
private async void WebSocketSharp_OnError(object sender, ErrorEventArgs e)
{
Log.Error(
$"Connection to WebSockets server {_webSocketsUri.AbsoluteUri} terminated unexpectedly with message {e.Message}",
e.Exception);
if (_cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken);
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
Connect();
}
private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e)
{
if (e.IsPing)
{
Log.Information($"Server {_server} sent PING message");
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
return;
}
await _webSocketMessageBufferBlock.SendAsync(e.RawData, _cancellationToken);
}
public void Stop()
{
if (_cancellationTokenSource == null) return;
_cancellationTokenSource.Cancel();
}
#endregion
#region Private Methods
private async Task RetrievePastMessages(CancellationToken cancellationToken)
{
var messageUriBuilder = new UriBuilder(_httpUri);
var gotifyApplicationBufferBlock = new BufferBlock<GotifyApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken });
var gotifyApplicationActionBlock = new ActionBlock<GotifyApplication>(async application =>
{
try
{
messageUriBuilder.Path = Path.Combine(messageUriBuilder.Path, "application", $"{application.Id}",
"message");
}
catch (ArgumentException exception)
{
Log.Error($"No application URL could be built for {_server.Url} due to {exception.Message}");
return;
}
HttpResponseMessage messagesResponse;
try
{
messagesResponse = await _httpClient.GetAsync(messageUriBuilder.Uri, cancellationToken);
}
catch (Exception exception)
{
Log.Error($"Could not get application {application.Id} due to {exception.Message}");
return;
}
var messages = await messagesResponse.Content.ReadAsStringAsync();
GotifyMessageQuery gotifyMessageQuery;
try
{
gotifyMessageQuery =
JsonConvert.DeserializeObject<GotifyMessageQuery>(messages);
}
catch (JsonSerializationException exception)
{
Log.Warning($"Could not deserialize the message response: {exception.Message}");
return;
}
foreach (var message in gotifyMessageQuery.Messages.Where(message => message.Date >= DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours)))
{
message.Server = _server;
var cachedImage = _applicationImageCache.Get($"{message.AppId}");
if (cachedImage is Stream cachedImageStream)
{
using var cachedImageMemoryStream = new MemoryStream();
await cachedImageStream.CopyToAsync(cachedImageMemoryStream);
var cachedApplicationImage = new Bitmap(cachedImageMemoryStream);
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, cachedApplicationImage));
return;
}
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken);
if (imageStream == null || imageStream.Length == 0)
{
Log.Warning("Could not find any application image for notification");
continue;
}
using var memoryStream = new MemoryStream();
await imageStream.CopyToAsync(memoryStream);
var imageBytes = memoryStream.ToArray();
_applicationImageCache.Add($"{message.AppId}", imageBytes,
new CacheItemPolicy
{
SlidingExpiration = TimeSpan.FromHours(1)
});
var image = new Bitmap(memoryStream);
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, image));
}
}, new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
gotifyApplicationBufferBlock.LinkTo(gotifyApplicationActionBlock,
new DataflowLinkOptions { PropagateCompletion = true });
await foreach (var application in RetrieveGotifyApplications(cancellationToken))
{
await gotifyApplicationBufferBlock.SendAsync(application, cancellationToken);
}
gotifyApplicationBufferBlock.Complete();
await gotifyApplicationActionBlock.Completion;
}
private async Task HeartBeat(CancellationToken cancellationToken)
{
try
{
do
{
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
_webSocketsClientPingStopWatch.Restart();
if (!_webSocketSharp.Ping())
{
Log.Warning($"Server {_server} did not respond to PING message.");
continue;
}
var delta = _webSocketsClientPingStopWatch.ElapsedMilliseconds;
Log.Information($"PING response latency for {_server} is {delta}ms");
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
} while (!cancellationToken.IsCancellationRequested);
}
catch (Exception exception) when (exception is OperationCanceledException ||
exception is ObjectDisposedException)
{
}
catch (Exception exception)
{
Log.Warning(exception, $"Heartbeat for server {_server} has failed due to {exception.Message}");
}
}
private async IAsyncEnumerable<GotifyApplication> RetrieveGotifyApplications([EnumeratorCancellation] CancellationToken cancellationToken)
{
var applicationsUriBuilder = new UriBuilder(_httpUri);
try
{
applicationsUriBuilder.Path = Path.Combine(applicationsUriBuilder.Path, "application");
}
catch (ArgumentException exception)
{
Log.Error($"No application URL could be built for {_server.Url} due to {exception}");
yield break;
}
HttpResponseMessage applicationsResponse;
try
{
applicationsResponse = await _httpClient.GetAsync(applicationsUriBuilder.Uri, cancellationToken);
}
catch (Exception exception)
{
Log.Error($"Could not retrieve applications: {exception.Message}");
yield break;
}
var applications = await applicationsResponse.Content.ReadAsStringAsync();
GotifyApplication[] gotifyApplications;
try
{
gotifyApplications =
JsonConvert.DeserializeObject<GotifyApplication[]>(applications);
}
catch (JsonSerializationException exception)
{
Log.Warning($"Could not deserialize the list of applications from the server: {exception}");
yield break;
}
foreach (var application in gotifyApplications)
{
yield return application;
}
}
private async Task<Stream> RetrieveGotifyApplicationImage(int appId, CancellationToken cancellationToken)
{
await foreach (var application in RetrieveGotifyApplications(cancellationToken))
{
if (application.Id != appId) continue;
if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute, out var applicationImageUri))
{
Log.Warning("Could not build URL path to application icon");
continue;
}
HttpResponseMessage imageResponse;
try
{
imageResponse = await _httpClient.GetAsync(applicationImageUri, cancellationToken);
}
catch (Exception exception)
{
Log.Error($"Could not retrieve application image: {exception.Message}");
return new MemoryStream();
}
var memoryStream = new MemoryStream();
await imageResponse.Content.CopyToAsync(memoryStream);
memoryStream.Position = 0L;
return memoryStream;
}
return new MemoryStream();
}
#endregion
}
}
Generated by GNU Enscript 1.6.5.90.