Winify

Subversion Repositories:
Compare Path: Rev
With Path: Rev
?path1? @ 64  →  ?path2? @ 66
/trunk/Winify/Gotify/GotifyConnection.cs
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Drawing;
@@ -18,6 +19,7 @@
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
using Newtonsoft.Json;
using Org.BouncyCastle.Asn1.Pkcs;
using Serilog;
using Servers;
using WebSocketSharp;
@@ -44,7 +46,7 @@
 
private CancellationTokenSource _cancellationTokenSource;
 
private Task _runTask;
private Task _heartBeatTask;
 
private HttpClient _httpClient;
 
@@ -53,16 +55,17 @@
private readonly Uri _httpUri;
private WebSocket _webSocketSharp;
private readonly Configuration.Configuration _configuration;
private Task _initTask;
private IDisposable _tplRetrievePastMessagesLink;
private IDisposable _tplWebSocketsBufferBlockTransformLink;
private IDisposable _tplWebSocketsTransformActionLink;
private IDisposable _tplWebSocketsTransformActionNullLink;
private readonly BufferBlock<byte[]> _webSocketMessageBufferBlock;
private readonly BufferBlock<GotifyConnectionData> _webSocketMessageBufferBlock;
private readonly Stopwatch _webSocketsClientPingStopWatch;
private readonly ScheduledContinuation _webSocketsServerResponseScheduledContinuation;
private Task _retrievePastMessagesTask;
private static JsonSerializer _jsonSerializer;
 
private readonly MemoryCache _applicationImageCache;
#endregion
 
#region Constructors, Destructors and Finalizers
@@ -69,36 +72,41 @@
 
private GotifyConnection()
{
_applicationImageCache = new MemoryCache("GotifyApplicationImageCache");
_jsonSerializer = new JsonSerializer();
_webSocketsServerResponseScheduledContinuation = new ScheduledContinuation();
_webSocketsClientPingStopWatch = new Stopwatch();
 
_webSocketMessageBufferBlock = new BufferBlock<byte[]>(new DataflowBlockOptions { CancellationToken = _cancellationToken });
var webSocketTransformBlock = new TransformBlock<byte[], GotifyMessage>(bytes =>
_webSocketMessageBufferBlock = new BufferBlock<GotifyConnectionData>(
new DataflowBlockOptions
{
CancellationToken = _cancellationToken
});
 
var webSocketTransformBlock = new TransformBlock<GotifyConnectionData, GotifyMessage>(data =>
{
if (bytes.Length == 0)
if (data.Payload == null || data.Payload.Length == 0)
{
return null;
}
 
var message = Encoding.UTF8.GetString(bytes, 0, bytes.Length);
 
GotifyMessage gotifyNotification;
 
try
{
var message = Encoding.UTF8.GetString(data.Payload, 0, data.Payload.Length);
 
gotifyNotification = JsonConvert.DeserializeObject<GotifyMessage>(message);
}
catch (JsonSerializationException exception)
{
Log.Warning($"Could not deserialize notification: {exception.Message}");
 
return null;
if (gotifyNotification == null)
{
throw new ArgumentNullException();
}
 
gotifyNotification.Server = data.Server;
}
 
if (gotifyNotification == null)
catch (Exception exception)
{
Log.Warning($"Could not deserialize gotify notification: {message}");
Log.Warning(exception, "Could not deserialize notification.");
 
return null;
}
@@ -109,45 +117,19 @@
 
var webSocketActionBlock = new ActionBlock<GotifyMessage>(async message =>
{
message.Server = _server;
 
var cachedImage = _applicationImageCache.Get($"{message.AppId}");
if (cachedImage is Stream cachedImageStream)
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken);
if (imageStream == null || imageStream.Length == 0)
{
using var cachedImageMemoryStream = new MemoryStream();
await cachedImageStream.CopyToAsync(cachedImageMemoryStream);
Log.Warning("Could not find any application image for notification.");
 
var cachedApplicationImage = new Bitmap(cachedImageMemoryStream);
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, cachedApplicationImage));
 
return;
}
 
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken);
if (imageStream == null || imageStream.Length == 0)
{
Log.Warning("Could not find any application image for notification");
return;
}
var image = new Bitmap(imageStream);
 
using var memoryStream = new MemoryStream();
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, image));
 
await imageStream.CopyToAsync(memoryStream);
 
var imageBytes = memoryStream.ToArray();
 
_applicationImageCache.Add($"{message.AppId}", imageBytes,
new CacheItemPolicy
{
SlidingExpiration = TimeSpan.FromHours(1)
});
 
var image = new Bitmap(memoryStream);
 
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, image));
 
Log.Debug($"Notification message received: {message.Message}");
 
}, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken });
@@ -204,17 +186,12 @@
break;
}
 
try
if (!Uri.TryCreate(webSocketsUriBuilder.Uri, "stream", out var combinedUri))
{
webSocketsUriBuilder.Path = Path.Combine(webSocketsUriBuilder.Path, "stream");
Log.Error($"No WebSockets URL could be built from the provided URL {_server.Url}.");
}
catch (ArgumentException exception)
{
Log.Error(
$"No WebSockets URL could be built from the provided URL {_server.Url} due to {exception.Message}");
}
 
_webSocketsUri = webSocketsUriBuilder.Uri;
_webSocketsUri = combinedUri;
}
 
public void Dispose()
@@ -277,18 +254,11 @@
_cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = _cancellationTokenSource.Token;
 
Connect();
 
if (_configuration.RetrievePastNotificationHours != 0)
if (_webSocketSharp != null)
{
_initTask = RetrievePastMessages(_cancellationToken);
return;
}
 
_runTask = HeartBeat(_cancellationToken);
}
 
private void Connect()
{
_webSocketSharp = new WebSocket(_webSocketsUri.AbsoluteUri);
_webSocketSharp.EmitOnPing = true;
_webSocketSharp.WaitTime = TimeSpan.FromMinutes(1);
@@ -315,7 +285,13 @@
_webSocketSharp.OnOpen += WebSocketSharp_OnOpen;
_webSocketSharp.OnClose += WebSocketSharp_OnClose;
 
_webSocketSharp.ConnectAsync();
_webSocketSharp.Connect();
_heartBeatTask = HeartBeat(_cancellationToken);
 
if (_configuration.RetrievePastNotificationHours != 0)
{
_retrievePastMessagesTask = RetrievePastMessages(_cancellationToken);
}
}
 
private void WebSocketSharp_OnClose(object sender, CloseEventArgs e)
@@ -342,17 +318,11 @@
$"Connection to WebSockets server {_webSocketsUri.AbsoluteUri} terminated unexpectedly with message {e.Message}",
e.Exception);
 
if (_cancellationToken.IsCancellationRequested)
{
Stop();
return;
}
 
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken);
 
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
 
Connect();
await Stop().ContinueWith(task => Start(), CancellationToken.None);
}
 
private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e)
@@ -362,18 +332,38 @@
Log.Information($"Server {_server} sent PING message");
 
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
 
return;
}
 
await _webSocketMessageBufferBlock.SendAsync(e.RawData, _cancellationToken);
await _webSocketMessageBufferBlock.SendAsync(new GotifyConnectionData(e.RawData, _server), _cancellationToken);
}
 
public void Stop()
public async Task Stop()
{
if (_cancellationTokenSource == null) return;
 
if (_webSocketSharp == null || _cancellationTokenSource == null)
{
return;
}
 
_cancellationTokenSource.Cancel();
 
await _heartBeatTask;
await _retrievePastMessagesTask;
 
if (_webSocketSharp != null)
{
_webSocketSharp.OnMessage -= WebSocketSharp_OnMessage;
_webSocketSharp.OnError -= WebSocketSharp_OnError;
_webSocketSharp.OnOpen -= WebSocketSharp_OnOpen;
_webSocketSharp.OnClose -= WebSocketSharp_OnClose;
 
_webSocketSharp.Close();
_webSocketSharp = null;
}
 
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;
}
 
#endregion
@@ -382,90 +372,63 @@
 
private async Task RetrievePastMessages(CancellationToken cancellationToken)
{
var messageUriBuilder = new UriBuilder(_httpUri);
 
var gotifyApplicationBufferBlock = new BufferBlock<GotifyApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken });
var gotifyApplicationActionBlock = new ActionBlock<GotifyApplication>(async application =>
var gotifyApplicationBufferBlock = new BufferBlock<GotifyConnectionApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken });
var gotifyApplicationActionBlock = new ActionBlock<GotifyConnectionApplication>(async gotifyConnectionApplication =>
{
try
if (!Uri.TryCreate(_httpUri, $"application/{gotifyConnectionApplication.Application.Id}/message", out var combinedUri))
{
messageUriBuilder.Path = Path.Combine(messageUriBuilder.Path, "application", $"{application.Id}",
"message");
}
catch (ArgumentException exception)
{
Log.Error($"No application URL could be built for {_server.Url} due to {exception.Message}");
Log.Error($"Could not get application message Uri {gotifyConnectionApplication.Application.Id}.");
 
return;
}
 
HttpResponseMessage messagesResponse;
GotifyMessageQuery gotifyMessageQuery;
try
{
messagesResponse = await _httpClient.GetAsync(messageUriBuilder.Uri, cancellationToken);
using var messageStream = await _httpClient.GetStreamAsync(combinedUri);
using var streamReader = new StreamReader(messageStream);
using var jsonTextReader = new JsonTextReader(streamReader);
 
gotifyMessageQuery = _jsonSerializer.Deserialize<GotifyMessageQuery>(jsonTextReader);
}
catch (Exception exception)
catch (HttpRequestException exception)
{
Log.Error($"Could not get application {application.Id} due to {exception.Message}");
Log.Warning(exception, $"Could not get application {gotifyConnectionApplication.Application.Id}.");
 
return;
}
catch (JsonSerializationException exception)
{
Log.Warning(exception,$"Could not deserialize the message response for application {gotifyConnectionApplication.Application.Id}.");
 
return;
}
 
var messages = await messagesResponse.Content.ReadAsStringAsync();
 
GotifyMessageQuery gotifyMessageQuery;
try
if (gotifyMessageQuery == null || gotifyMessageQuery.Messages == null)
{
gotifyMessageQuery =
JsonConvert.DeserializeObject<GotifyMessageQuery>(messages);
}
catch (JsonSerializationException exception)
{
Log.Warning($"Could not deserialize the message response: {exception.Message}");
Log.Warning("Invalid application messages deserialized deserialized.");
 
return;
}
 
foreach (var message in gotifyMessageQuery.Messages.Where(message => message.Date >= DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours)))
foreach (var message in gotifyMessageQuery.Messages)
{
message.Server = _server;
 
var cachedImage = _applicationImageCache.Get($"{message.AppId}");
if (cachedImage is Stream cachedImageStream)
if (message.Date < DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours))
{
using var cachedImageMemoryStream = new MemoryStream();
await cachedImageStream.CopyToAsync(cachedImageMemoryStream);
 
var cachedApplicationImage = new Bitmap(cachedImageMemoryStream);
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, cachedApplicationImage));
 
return;
continue;
}
 
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken);
 
if (imageStream == null || imageStream.Length == 0)
{
Log.Warning("Could not find any application image for notification");
Log.Warning("Could not find any application image for notification.");
 
continue;
}
 
using var memoryStream = new MemoryStream();
var image = new Bitmap(imageStream);
message.Server = gotifyConnectionApplication.Server;
 
await imageStream.CopyToAsync(memoryStream);
 
var imageBytes = memoryStream.ToArray();
 
_applicationImageCache.Add($"{message.AppId}", imageBytes,
new CacheItemPolicy
{
SlidingExpiration = TimeSpan.FromHours(1)
});
 
var image = new Bitmap(memoryStream);
 
GotifyNotification?.Invoke(this,
new GotifyNotificationEventArgs(message, image));
}
@@ -475,11 +438,14 @@
gotifyApplicationBufferBlock.LinkTo(gotifyApplicationActionBlock,
new DataflowLinkOptions { PropagateCompletion = true });
 
var tasks = new ConcurrentBag<Task>();
await foreach (var application in RetrieveGotifyApplications(cancellationToken))
{
await gotifyApplicationBufferBlock.SendAsync(application, cancellationToken);
var gotifyConnectionApplication = new GotifyConnectionApplication(_server, application);
tasks.Add(gotifyApplicationBufferBlock.SendAsync(gotifyConnectionApplication, cancellationToken));
}
 
await Task.WhenAll(tasks);
gotifyApplicationBufferBlock.Complete();
await gotifyApplicationActionBlock.Completion;
 
@@ -519,42 +485,30 @@
 
private async IAsyncEnumerable<GotifyApplication> RetrieveGotifyApplications([EnumeratorCancellation] CancellationToken cancellationToken)
{
var applicationsUriBuilder = new UriBuilder(_httpUri);
try
if (!Uri.TryCreate(_httpUri, "application", out var combinedUri))
{
applicationsUriBuilder.Path = Path.Combine(applicationsUriBuilder.Path, "application");
}
catch (ArgumentException exception)
{
Log.Error($"No application URL could be built for {_server.Url} due to {exception}");
Log.Error($"No application URL could be built for {_server.Url}.");
 
yield break;
}
 
HttpResponseMessage applicationsResponse;
 
GotifyApplication[] gotifyApplications;
try
{
applicationsResponse = await _httpClient.GetAsync(applicationsUriBuilder.Uri, cancellationToken);
}
catch (Exception exception)
{
Log.Error($"Could not retrieve applications: {exception.Message}");
using var messageStream = await _httpClient.GetStreamAsync(combinedUri);
using var streamReader = new StreamReader(messageStream);
using var jsonTextReader = new JsonTextReader(streamReader);
 
yield break;
}
gotifyApplications = _jsonSerializer.Deserialize<GotifyApplication[]>(jsonTextReader);
 
var applications = await applicationsResponse.Content.ReadAsStringAsync();
 
GotifyApplication[] gotifyApplications;
try
{
gotifyApplications =
JsonConvert.DeserializeObject<GotifyApplication[]>(applications);
if (gotifyApplications == null)
{
throw new ArgumentNullException();
}
}
catch (JsonSerializationException exception)
catch (Exception exception)
{
Log.Warning($"Could not deserialize the list of applications from the server: {exception}");
Log.Warning(exception,"Could not deserialize the list of applications from the server.");
 
yield break;
}
@@ -569,39 +523,61 @@
{
await foreach (var application in RetrieveGotifyApplications(cancellationToken))
{
if (application.Id != appId) continue;
if (application.Id != appId)
{
continue;
}
 
if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute, out var applicationImageUri))
{
Log.Warning("Could not build URL path to application icon");
 
continue;
}
 
HttpResponseMessage imageResponse;
 
try
{
imageResponse = await _httpClient.GetAsync(applicationImageUri, cancellationToken);
var imageResponse = await _httpClient.GetAsync(applicationImageUri, cancellationToken);
 
var memoryStream = new MemoryStream();
 
await imageResponse.Content.CopyToAsync(memoryStream);
 
return memoryStream;
}
catch (Exception exception)
{
Log.Error($"Could not retrieve application image: {exception.Message}");
 
return new MemoryStream();
}
}
 
var memoryStream = new MemoryStream();
return new MemoryStream();
}
 
await imageResponse.Content.CopyToAsync(memoryStream);
#endregion
 
memoryStream.Position = 0L;
private class GotifyConnectionApplication
{
public GotifyApplication Application { get; }
public Server Server { get; }
 
return memoryStream;
public GotifyConnectionApplication(Server server, GotifyApplication application)
{
Server = server;
Application = application;
}
}
 
return new MemoryStream();
private class GotifyConnectionData
{
public byte[] Payload { get; }
public Server Server { get; }
 
public GotifyConnectionData(byte[] payload, Server server)
{
Payload = payload;
Server = server;
}
}
 
#endregion
}
}