Winify

Subversion Repositories:
Compare Path: Rev
With Path: Rev
?path1? @ 59  →  ?path2? @ 61
/trunk/Winify/Gotify/GotifyConnection.cs
@@ -1,19 +1,28 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Drawing;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.Caching;
using System.Runtime.CompilerServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
using Newtonsoft.Json;
using Serilog;
using Servers;
using WebSocketSharp;
using WebSocketSharp.Net;
using Winify.Utilities;
using ErrorEventArgs = WebSocketSharp.ErrorEventArgs;
using NetworkCredential = System.Net.NetworkCredential;
 
@@ -45,7 +54,15 @@
private WebSocket _webSocketSharp;
private readonly Configuration.Configuration _configuration;
private Task _initTask;
private IDisposable _tplRetrievePastMessagesLink;
private IDisposable _tplWebSocketsBufferBlockTransformLink;
private IDisposable _tplWebSocketsTransformActionLink;
private IDisposable _tplWebSocketsTransformActionNullLink;
private readonly BufferBlock<byte[]> _webSocketMessageBufferBlock;
private readonly Stopwatch _webSocketsClientPingStopWatch;
private readonly ScheduledContinuation _webSocketsServerResponseScheduledContinuation;
 
private readonly MemoryCache _applicationImageCache;
#endregion
 
#region Constructors, Destructors and Finalizers
@@ -52,6 +69,86 @@
 
private GotifyConnection()
{
_applicationImageCache = new MemoryCache("GotifyApplicationImageCache");
_webSocketsServerResponseScheduledContinuation = new ScheduledContinuation();
_webSocketsClientPingStopWatch = new Stopwatch();
 
_webSocketMessageBufferBlock = new BufferBlock<byte[]>(new DataflowBlockOptions { CancellationToken = _cancellationToken });
var webSocketTransformBlock = new TransformBlock<byte[], GotifyMessage>(bytes =>
{
if (bytes.Length == 0)
{
return null;
}
 
var message = Encoding.UTF8.GetString(bytes, 0, bytes.Length);
 
GotifyMessage gotifyNotification;
 
try
{
gotifyNotification = JsonConvert.DeserializeObject<GotifyMessage>(message);
}
catch (JsonSerializationException exception)
{
Log.Warning($"Could not deserialize notification: {exception.Message}");
 
return null;
}
 
if (gotifyNotification == null)
{
Log.Warning($"Could not deserialize gotify notification: {message}");
 
return null;
}
 
return gotifyNotification;
 
}, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken });
 
var webSocketActionBlock = new ActionBlock<GotifyMessage>(async message =>
{
message.Server = _server;
 
var cachedImage = _applicationImageCache.Get($"{message.AppId}");
if (cachedImage is Image applicationImage)
{
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, applicationImage));
return;
}
 
using (var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken))
{
if (imageStream == null || imageStream.Length == 0)
{
Log.Warning("Could not find any application image for notification");
return;
}
 
var image = Image.FromStream(imageStream);
 
_applicationImageCache.Add($"{message.AppId}", image.Clone(),
new CacheItemPolicy
{
SlidingExpiration = TimeSpan.FromHours(1)
});
 
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, image));
}
 
Log.Debug($"Notification message received: {message.Message}");
 
}, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken });
 
_tplWebSocketsBufferBlockTransformLink = _webSocketMessageBufferBlock.LinkTo(webSocketTransformBlock,
new DataflowLinkOptions { PropagateCompletion = true });
_tplWebSocketsTransformActionLink = webSocketTransformBlock.LinkTo(webSocketActionBlock,
new DataflowLinkOptions { PropagateCompletion = true }, message => message != null);
_tplWebSocketsTransformActionNullLink = webSocketTransformBlock.LinkTo(DataflowBlock.NullTarget<GotifyMessage>(),
new DataflowLinkOptions() { PropagateCompletion = true });
}
 
public GotifyConnection(Server server, Configuration.Configuration configuration) : this()
@@ -119,6 +216,30 @@
_cancellationTokenSource = null;
}
 
if (_tplWebSocketsBufferBlockTransformLink != null)
{
_tplWebSocketsBufferBlockTransformLink.Dispose();
_tplWebSocketsBufferBlockTransformLink = null;
}
 
if (_tplWebSocketsTransformActionLink != null)
{
_tplWebSocketsTransformActionLink.Dispose();
_tplWebSocketsTransformActionLink = null;
}
 
if (_tplWebSocketsTransformActionNullLink != null)
{
_tplWebSocketsTransformActionNullLink.Dispose();
_tplWebSocketsTransformActionNullLink = null;
}
 
if (_tplRetrievePastMessagesLink != null)
{
_tplRetrievePastMessagesLink.Dispose();
_tplRetrievePastMessagesLink = null;
}
 
if (_webSocketSharp != null)
{
_webSocketSharp.Close();
@@ -154,12 +275,14 @@
_initTask = RetrievePastMessages(_cancellationToken);
}
 
_runTask = Run(_cancellationToken);
_runTask = HeartBeat(_cancellationToken);
}
 
private void Connect()
{
_webSocketSharp = new WebSocket(_webSocketsUri.AbsoluteUri);
_webSocketSharp.EmitOnPing = true;
_webSocketSharp.WaitTime = TimeSpan.FromMinutes(1);
_webSocketSharp.SslConfiguration = new ClientSslConfiguration(_webSocketsUri.Host,
new X509CertificateCollection(new X509Certificate[] { }), SslProtocols.Tls12, false);
if (_configuration.Proxy.Enable)
@@ -195,8 +318,15 @@
private void WebSocketSharp_OnOpen(object sender, EventArgs e)
{
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} is now open");
 
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
}
 
private void OnUnresponsiveServer()
{
Log.Warning($"Server {_server} has not responded in a long while...");
}
 
private async void WebSocketSharp_OnError(object sender, ErrorEventArgs e)
{
Log.Error(
@@ -210,6 +340,7 @@
}
 
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken);
 
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
 
Connect();
@@ -217,64 +348,16 @@
 
private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e)
{
if (e.RawData.Length == 0)
if (e.IsPing)
{
Log.Warning("Empty message received from server");
return;
}
Log.Information($"Server {_server} sent PING message");
 
var message = Encoding.UTF8.GetString(e.RawData, 0, e.RawData.Length);
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
 
GotifyMessage gotifyNotification;
 
try
{
gotifyNotification = JsonConvert.DeserializeObject<GotifyMessage>(message);
}
catch (JsonSerializationException exception)
{
Log.Warning($"Could not deserialize notification: {exception.Message}");
return;
}
 
if (gotifyNotification == null)
{
Log.Warning($"Could not deserialize gotify notification: {message}");
 
return;
}
 
gotifyNotification.Server = _server;
 
var applicationUriBuilder = new UriBuilder(_httpUri);
try
{
applicationUriBuilder.Path = Path.Combine(applicationUriBuilder.Path, "application");
}
catch (ArgumentException exception)
{
Log.Warning("Could not build an URI to an application");
 
return;
}
 
using (var imageStream =
await RetrieveGotifyApplicationImage(gotifyNotification.AppId, applicationUriBuilder.Uri,
_cancellationToken))
{
if (imageStream == null)
{
Log.Warning("Could not find any application image for notification");
return;
}
 
var image = Image.FromStream(imageStream);
 
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(gotifyNotification, image));
}
 
Log.Debug($"Notification message received: {gotifyNotification.Message}");
await _webSocketMessageBufferBlock.SendAsync(e.RawData, _cancellationToken);
}
 
public void Stop()
@@ -291,7 +374,9 @@
private async Task RetrievePastMessages(CancellationToken cancellationToken)
{
var messageUriBuilder = new UriBuilder(_httpUri);
foreach (var application in await RetrieveGotifyApplications(cancellationToken))
 
var gotifyApplicationBufferBlock = new BufferBlock<GotifyApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken });
var gotifyApplicationActionBlock = new ActionBlock<GotifyApplication>(async application =>
{
try
{
@@ -302,12 +387,22 @@
{
Log.Error($"No application URL could be built for {_server.Url} due to {exception.Message}");
 
continue;
return;
}
 
var messagesResponse = await _httpClient.GetAsync(messageUriBuilder.Uri, cancellationToken);
HttpResponseMessage messagesResponse;
try
{
messagesResponse = await _httpClient.GetAsync(messageUriBuilder.Uri, cancellationToken);
}
catch (Exception exception)
{
Log.Error($"Could not get application {application.Id} due to {exception.Message}");
 
return;
}
 
 
var messages = await messagesResponse.Content.ReadAsStringAsync();
 
GotifyMessageQuery gotifyMessageQuery;
@@ -320,54 +415,75 @@
{
Log.Warning($"Could not deserialize the message response: {exception.Message}");
 
continue;
return;
}
 
var applicationUriBuilder = new UriBuilder(_httpUri);
try
foreach (var message in gotifyMessageQuery.Messages.Where(message => message.Date >= DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours)))
{
applicationUriBuilder.Path = Path.Combine(applicationUriBuilder.Path, "application");
}
catch (ArgumentException exception)
{
Log.Warning($"Could not build an URI to an application: {exception}");
message.Server = _server;
 
return;
}
var cachedImage = _applicationImageCache.Get($"{message.AppId}");
if (cachedImage is Image applicationImage)
{
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, applicationImage));
return;
}
 
foreach (var message in gotifyMessageQuery.Messages)
{
if (message.Date < DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours))
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken);
if (imageStream == null || imageStream.Length == 0)
{
Log.Warning("Could not find any application image for notification");
continue;
}
 
message.Server = _server;
var image = Image.FromStream(imageStream);
 
using (var imageStream =
await RetrieveGotifyApplicationImage(message.AppId, applicationUriBuilder.Uri,
_cancellationToken))
{
if (imageStream == null)
_applicationImageCache.Add($"{message.AppId}", image.Clone(),
new CacheItemPolicy
{
Log.Warning("Could not find any application image for notification");
return;
}
SlidingExpiration = TimeSpan.FromHours(1)
});
 
var image = Image.FromStream(imageStream);
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, image));
}
 
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, image));
}
}
}, new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
 
gotifyApplicationBufferBlock.LinkTo(gotifyApplicationActionBlock,
new DataflowLinkOptions { PropagateCompletion = true });
 
await foreach (var application in RetrieveGotifyApplications(cancellationToken))
{
await gotifyApplicationBufferBlock.SendAsync(application, cancellationToken);
}
 
gotifyApplicationBufferBlock.Complete();
await gotifyApplicationActionBlock.Completion;
 
}
 
private async Task Run(CancellationToken cancellationToken)
private async Task HeartBeat(CancellationToken cancellationToken)
{
try
{
do
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
 
_webSocketsClientPingStopWatch.Restart();
if (!_webSocketSharp.Ping())
{
Log.Warning($"Server {_server} did not respond to PING message.");
continue;
}
 
var delta = _webSocketsClientPingStopWatch.ElapsedMilliseconds;
 
Log.Information($"PING response latency for {_server} is {delta}ms");
 
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
} while (!cancellationToken.IsCancellationRequested);
}
catch (Exception exception) when (exception is OperationCanceledException ||
@@ -376,11 +492,11 @@
}
catch (Exception exception)
{
Log.Warning(exception, "Failure running connection loop");
Log.Warning(exception, $"Heartbeat for server {_server} has failed due to {exception.Message}");
}
}
 
private async Task<GotifyApplication[]> RetrieveGotifyApplications(CancellationToken cancellationToken)
private async IAsyncEnumerable<GotifyApplication> RetrieveGotifyApplications([EnumeratorCancellation] CancellationToken cancellationToken)
{
var applicationsUriBuilder = new UriBuilder(_httpUri);
try
@@ -390,35 +506,25 @@
catch (ArgumentException exception)
{
Log.Error($"No application URL could be built for {_server.Url} due to {exception}");
 
yield break;
}
 
var applicationsResponse = await _httpClient.GetAsync(applicationsUriBuilder.Uri, cancellationToken);
HttpResponseMessage applicationsResponse;
 
var applications = await applicationsResponse.Content.ReadAsStringAsync();
 
GotifyApplication[] gotifyApplications;
try
{
gotifyApplications =
JsonConvert.DeserializeObject<GotifyApplication[]>(applications);
applicationsResponse = await _httpClient.GetAsync(applicationsUriBuilder.Uri, cancellationToken);
}
catch (JsonSerializationException exception)
catch (Exception exception)
{
Log.Warning($"Could not deserialize the list of applications from the server: {exception}");
Log.Error($"Could not retrieve applications: {exception.Message}");
 
return null;
yield break;
}
 
return gotifyApplications;
}
var applications = await applicationsResponse.Content.ReadAsStringAsync();
 
private async Task<Stream> RetrieveGotifyApplicationImage(int appId, Uri applicationUri,
CancellationToken cancellationToken)
{
var applicationResponse = await _httpClient.GetAsync(applicationUri, cancellationToken);
 
var applications = await applicationResponse.Content.ReadAsStringAsync();
 
GotifyApplication[] gotifyApplications;
try
{
@@ -427,32 +533,52 @@
}
catch (JsonSerializationException exception)
{
Log.Warning($"Could not deserialize the list of applications from the server: {exception.Message}");
Log.Warning($"Could not deserialize the list of applications from the server: {exception}");
 
return null;
yield break;
}
 
foreach (var application in gotifyApplications)
{
yield return application;
}
}
 
private async Task<Stream> RetrieveGotifyApplicationImage(int appId, CancellationToken cancellationToken)
{
await foreach (var application in RetrieveGotifyApplications(cancellationToken))
{
if (application.Id != appId) continue;
 
if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute,
out var applicationImageUri))
if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute, out var applicationImageUri))
{
Log.Warning("Could not build URL path to application icon");
continue;
}
 
var imageResponse = await _httpClient.GetAsync(applicationImageUri, cancellationToken);
HttpResponseMessage imageResponse;
 
try
{
imageResponse = await _httpClient.GetAsync(applicationImageUri, cancellationToken);
}
catch (Exception exception)
{
Log.Error($"Could not retrieve application image: {exception.Message}");
 
return new MemoryStream();
}
 
var memoryStream = new MemoryStream();
 
await imageResponse.Content.CopyToAsync(memoryStream);
 
memoryStream.Position = 0L;
 
return memoryStream;
}
 
return null;
return new MemoryStream();
}
 
#endregion