/trunk/Winify/Gotify/GotifyConnection.cs |
@@ -1,4 +1,5 @@ |
using System; |
using System.Collections.Concurrent; |
using System.Collections.Generic; |
using System.Diagnostics; |
using System.Drawing; |
@@ -18,6 +19,7 @@ |
using System.Threading.Tasks.Dataflow; |
using System.Windows.Forms; |
using Newtonsoft.Json; |
using Org.BouncyCastle.Asn1.Pkcs; |
using Serilog; |
using Servers; |
using WebSocketSharp; |
@@ -44,7 +46,7 @@ |
|
private CancellationTokenSource _cancellationTokenSource; |
|
private Task _runTask; |
private Task _heartBeatTask; |
|
private HttpClient _httpClient; |
|
@@ -53,16 +55,17 @@ |
private readonly Uri _httpUri; |
private WebSocket _webSocketSharp; |
private readonly Configuration.Configuration _configuration; |
private Task _initTask; |
private IDisposable _tplRetrievePastMessagesLink; |
private IDisposable _tplWebSocketsBufferBlockTransformLink; |
private IDisposable _tplWebSocketsTransformActionLink; |
private IDisposable _tplWebSocketsTransformActionNullLink; |
private readonly BufferBlock<byte[]> _webSocketMessageBufferBlock; |
private readonly BufferBlock<GotifyConnectionData> _webSocketMessageBufferBlock; |
private readonly Stopwatch _webSocketsClientPingStopWatch; |
private readonly ScheduledContinuation _webSocketsServerResponseScheduledContinuation; |
|
private readonly MemoryCache _applicationImageCache; |
private Task _retrievePastMessagesTask; |
private static JsonSerializer _jsonSerializer; |
|
#endregion |
|
#region Constructors, Destructors and Finalizers |
@@ -69,36 +72,41 @@ |
|
private GotifyConnection() |
{ |
_applicationImageCache = new MemoryCache("GotifyApplicationImageCache"); |
_jsonSerializer = new JsonSerializer(); |
_webSocketsServerResponseScheduledContinuation = new ScheduledContinuation(); |
_webSocketsClientPingStopWatch = new Stopwatch(); |
|
_webSocketMessageBufferBlock = new BufferBlock<byte[]>(new DataflowBlockOptions { CancellationToken = _cancellationToken }); |
var webSocketTransformBlock = new TransformBlock<byte[], GotifyMessage>(bytes => |
_webSocketMessageBufferBlock = new BufferBlock<GotifyConnectionData>( |
new DataflowBlockOptions |
{ |
if (bytes.Length == 0) |
CancellationToken = _cancellationToken |
}); |
|
var webSocketTransformBlock = new TransformBlock<GotifyConnectionData, GotifyMessage>(data => |
{ |
if (data.Payload == null || data.Payload.Length == 0) |
{ |
return null; |
} |
|
var message = Encoding.UTF8.GetString(bytes, 0, bytes.Length); |
|
GotifyMessage gotifyNotification; |
|
try |
{ |
var message = Encoding.UTF8.GetString(data.Payload, 0, data.Payload.Length); |
|
gotifyNotification = JsonConvert.DeserializeObject<GotifyMessage>(message); |
|
if (gotifyNotification == null) |
{ |
throw new ArgumentNullException(); |
} |
catch (JsonSerializationException exception) |
{ |
Log.Warning($"Could not deserialize notification: {exception.Message}"); |
|
return null; |
gotifyNotification.Server = data.Server; |
} |
|
if (gotifyNotification == null) |
catch (Exception exception) |
{ |
Log.Warning($"Could not deserialize gotify notification: {message}"); |
Log.Warning(exception, "Could not deserialize notification."); |
|
return null; |
} |
@@ -109,42 +117,16 @@ |
|
var webSocketActionBlock = new ActionBlock<GotifyMessage>(async message => |
{ |
message.Server = _server; |
|
var cachedImage = _applicationImageCache.Get($"{message.AppId}"); |
if (cachedImage is Stream cachedImageStream) |
{ |
using var cachedImageMemoryStream = new MemoryStream(); |
await cachedImageStream.CopyToAsync(cachedImageMemoryStream); |
|
var cachedApplicationImage = new Bitmap(cachedImageMemoryStream); |
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(message, cachedApplicationImage)); |
|
return; |
} |
|
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken); |
if (imageStream == null || imageStream.Length == 0) |
{ |
Log.Warning("Could not find any application image for notification"); |
Log.Warning("Could not find any application image for notification."); |
|
return; |
} |
|
using var memoryStream = new MemoryStream(); |
var image = new Bitmap(imageStream); |
|
await imageStream.CopyToAsync(memoryStream); |
|
var imageBytes = memoryStream.ToArray(); |
|
_applicationImageCache.Add($"{message.AppId}", imageBytes, |
new CacheItemPolicy |
{ |
SlidingExpiration = TimeSpan.FromHours(1) |
}); |
|
var image = new Bitmap(memoryStream); |
|
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(message, image)); |
|
@@ -204,17 +186,12 @@ |
break; |
} |
|
try |
if (!Uri.TryCreate(webSocketsUriBuilder.Uri, "stream", out var combinedUri)) |
{ |
webSocketsUriBuilder.Path = Path.Combine(webSocketsUriBuilder.Path, "stream"); |
Log.Error($"No WebSockets URL could be built from the provided URL {_server.Url}."); |
} |
catch (ArgumentException exception) |
{ |
Log.Error( |
$"No WebSockets URL could be built from the provided URL {_server.Url} due to {exception.Message}"); |
} |
|
_webSocketsUri = webSocketsUriBuilder.Uri; |
_webSocketsUri = combinedUri; |
} |
|
public void Dispose() |
@@ -277,18 +254,11 @@ |
_cancellationTokenSource = new CancellationTokenSource(); |
_cancellationToken = _cancellationTokenSource.Token; |
|
Connect(); |
|
if (_configuration.RetrievePastNotificationHours != 0) |
if (_webSocketSharp != null) |
{ |
_initTask = RetrievePastMessages(_cancellationToken); |
return; |
} |
|
_runTask = HeartBeat(_cancellationToken); |
} |
|
private void Connect() |
{ |
_webSocketSharp = new WebSocket(_webSocketsUri.AbsoluteUri); |
_webSocketSharp.EmitOnPing = true; |
_webSocketSharp.WaitTime = TimeSpan.FromMinutes(1); |
@@ -315,8 +285,14 @@ |
_webSocketSharp.OnOpen += WebSocketSharp_OnOpen; |
_webSocketSharp.OnClose += WebSocketSharp_OnClose; |
|
_webSocketSharp.ConnectAsync(); |
_webSocketSharp.Connect(); |
_heartBeatTask = HeartBeat(_cancellationToken); |
|
if (_configuration.RetrievePastNotificationHours != 0) |
{ |
_retrievePastMessagesTask = RetrievePastMessages(_cancellationToken); |
} |
} |
|
private void WebSocketSharp_OnClose(object sender, CloseEventArgs e) |
{ |
@@ -342,17 +318,11 @@ |
$"Connection to WebSockets server {_webSocketsUri.AbsoluteUri} terminated unexpectedly with message {e.Message}", |
e.Exception); |
|
if (_cancellationToken.IsCancellationRequested) |
{ |
Stop(); |
return; |
} |
|
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken); |
|
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
|
Connect(); |
await Stop().ContinueWith(task => Start(), CancellationToken.None); |
} |
|
private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e) |
@@ -362,20 +332,40 @@ |
Log.Information($"Server {_server} sent PING message"); |
|
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken); |
|
return; |
} |
|
await _webSocketMessageBufferBlock.SendAsync(e.RawData, _cancellationToken); |
await _webSocketMessageBufferBlock.SendAsync(new GotifyConnectionData(e.RawData, _server), _cancellationToken); |
} |
|
public void Stop() |
public async Task Stop() |
{ |
if (_cancellationTokenSource == null) return; |
|
if (_webSocketSharp == null || _cancellationTokenSource == null) |
{ |
return; |
} |
|
_cancellationTokenSource.Cancel(); |
|
await _heartBeatTask; |
await _retrievePastMessagesTask; |
|
if (_webSocketSharp != null) |
{ |
_webSocketSharp.OnMessage -= WebSocketSharp_OnMessage; |
_webSocketSharp.OnError -= WebSocketSharp_OnError; |
_webSocketSharp.OnOpen -= WebSocketSharp_OnOpen; |
_webSocketSharp.OnClose -= WebSocketSharp_OnClose; |
|
_webSocketSharp.Close(); |
_webSocketSharp = null; |
} |
|
_cancellationTokenSource.Dispose(); |
_cancellationTokenSource = null; |
} |
|
#endregion |
|
#region Private Methods |
@@ -382,90 +372,63 @@ |
|
private async Task RetrievePastMessages(CancellationToken cancellationToken) |
{ |
var messageUriBuilder = new UriBuilder(_httpUri); |
|
var gotifyApplicationBufferBlock = new BufferBlock<GotifyApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken }); |
var gotifyApplicationActionBlock = new ActionBlock<GotifyApplication>(async application => |
var gotifyApplicationBufferBlock = new BufferBlock<GotifyConnectionApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken }); |
var gotifyApplicationActionBlock = new ActionBlock<GotifyConnectionApplication>(async gotifyConnectionApplication => |
{ |
try |
if (!Uri.TryCreate(_httpUri, $"application/{gotifyConnectionApplication.Application.Id}/message", out var combinedUri)) |
{ |
messageUriBuilder.Path = Path.Combine(messageUriBuilder.Path, "application", $"{application.Id}", |
"message"); |
} |
catch (ArgumentException exception) |
{ |
Log.Error($"No application URL could be built for {_server.Url} due to {exception.Message}"); |
Log.Error($"Could not get application message Uri {gotifyConnectionApplication.Application.Id}."); |
|
return; |
} |
|
HttpResponseMessage messagesResponse; |
GotifyMessageQuery gotifyMessageQuery; |
try |
{ |
messagesResponse = await _httpClient.GetAsync(messageUriBuilder.Uri, cancellationToken); |
using var messageStream = await _httpClient.GetStreamAsync(combinedUri); |
using var streamReader = new StreamReader(messageStream); |
using var jsonTextReader = new JsonTextReader(streamReader); |
|
gotifyMessageQuery = _jsonSerializer.Deserialize<GotifyMessageQuery>(jsonTextReader); |
} |
catch (Exception exception) |
catch (HttpRequestException exception) |
{ |
Log.Error($"Could not get application {application.Id} due to {exception.Message}"); |
Log.Warning(exception, $"Could not get application {gotifyConnectionApplication.Application.Id}."); |
|
return; |
} |
|
|
var messages = await messagesResponse.Content.ReadAsStringAsync(); |
|
GotifyMessageQuery gotifyMessageQuery; |
try |
{ |
gotifyMessageQuery = |
JsonConvert.DeserializeObject<GotifyMessageQuery>(messages); |
} |
catch (JsonSerializationException exception) |
{ |
Log.Warning($"Could not deserialize the message response: {exception.Message}"); |
Log.Warning(exception,$"Could not deserialize the message response for application {gotifyConnectionApplication.Application.Id}."); |
|
return; |
} |
|
foreach (var message in gotifyMessageQuery.Messages.Where(message => message.Date >= DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours))) |
if (gotifyMessageQuery == null || gotifyMessageQuery.Messages == null) |
{ |
message.Server = _server; |
Log.Warning("Invalid application messages deserialized deserialized."); |
|
var cachedImage = _applicationImageCache.Get($"{message.AppId}"); |
if (cachedImage is Stream cachedImageStream) |
{ |
using var cachedImageMemoryStream = new MemoryStream(); |
await cachedImageStream.CopyToAsync(cachedImageMemoryStream); |
|
var cachedApplicationImage = new Bitmap(cachedImageMemoryStream); |
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(message, cachedApplicationImage)); |
|
return; |
} |
|
foreach (var message in gotifyMessageQuery.Messages) |
{ |
if (message.Date < DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours)) |
{ |
continue; |
} |
|
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken); |
|
if (imageStream == null || imageStream.Length == 0) |
{ |
Log.Warning("Could not find any application image for notification"); |
Log.Warning("Could not find any application image for notification."); |
|
continue; |
} |
|
using var memoryStream = new MemoryStream(); |
var image = new Bitmap(imageStream); |
message.Server = gotifyConnectionApplication.Server; |
|
await imageStream.CopyToAsync(memoryStream); |
|
var imageBytes = memoryStream.ToArray(); |
|
_applicationImageCache.Add($"{message.AppId}", imageBytes, |
new CacheItemPolicy |
{ |
SlidingExpiration = TimeSpan.FromHours(1) |
}); |
|
var image = new Bitmap(memoryStream); |
|
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(message, image)); |
} |
@@ -475,11 +438,14 @@ |
gotifyApplicationBufferBlock.LinkTo(gotifyApplicationActionBlock, |
new DataflowLinkOptions { PropagateCompletion = true }); |
|
var tasks = new ConcurrentBag<Task>(); |
await foreach (var application in RetrieveGotifyApplications(cancellationToken)) |
{ |
await gotifyApplicationBufferBlock.SendAsync(application, cancellationToken); |
var gotifyConnectionApplication = new GotifyConnectionApplication(_server, application); |
tasks.Add(gotifyApplicationBufferBlock.SendAsync(gotifyConnectionApplication, cancellationToken)); |
} |
|
await Task.WhenAll(tasks); |
gotifyApplicationBufferBlock.Complete(); |
await gotifyApplicationActionBlock.Completion; |
|
@@ -519,42 +485,30 @@ |
|
private async IAsyncEnumerable<GotifyApplication> RetrieveGotifyApplications([EnumeratorCancellation] CancellationToken cancellationToken) |
{ |
var applicationsUriBuilder = new UriBuilder(_httpUri); |
try |
if (!Uri.TryCreate(_httpUri, "application", out var combinedUri)) |
{ |
applicationsUriBuilder.Path = Path.Combine(applicationsUriBuilder.Path, "application"); |
} |
catch (ArgumentException exception) |
{ |
Log.Error($"No application URL could be built for {_server.Url} due to {exception}"); |
Log.Error($"No application URL could be built for {_server.Url}."); |
|
yield break; |
} |
|
HttpResponseMessage applicationsResponse; |
|
GotifyApplication[] gotifyApplications; |
try |
{ |
applicationsResponse = await _httpClient.GetAsync(applicationsUriBuilder.Uri, cancellationToken); |
} |
catch (Exception exception) |
{ |
Log.Error($"Could not retrieve applications: {exception.Message}"); |
using var messageStream = await _httpClient.GetStreamAsync(combinedUri); |
using var streamReader = new StreamReader(messageStream); |
using var jsonTextReader = new JsonTextReader(streamReader); |
|
yield break; |
} |
gotifyApplications = _jsonSerializer.Deserialize<GotifyApplication[]>(jsonTextReader); |
|
var applications = await applicationsResponse.Content.ReadAsStringAsync(); |
|
GotifyApplication[] gotifyApplications; |
try |
if (gotifyApplications == null) |
{ |
gotifyApplications = |
JsonConvert.DeserializeObject<GotifyApplication[]>(applications); |
throw new ArgumentNullException(); |
} |
catch (JsonSerializationException exception) |
} |
catch (Exception exception) |
{ |
Log.Warning($"Could not deserialize the list of applications from the server: {exception}"); |
Log.Warning(exception,"Could not deserialize the list of applications from the server."); |
|
yield break; |
} |
@@ -569,39 +523,61 @@ |
{ |
await foreach (var application in RetrieveGotifyApplications(cancellationToken)) |
{ |
if (application.Id != appId) continue; |
if (application.Id != appId) |
{ |
continue; |
} |
|
if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute, out var applicationImageUri)) |
{ |
Log.Warning("Could not build URL path to application icon"); |
|
continue; |
} |
|
HttpResponseMessage imageResponse; |
|
try |
{ |
imageResponse = await _httpClient.GetAsync(applicationImageUri, cancellationToken); |
var imageResponse = await _httpClient.GetAsync(applicationImageUri, cancellationToken); |
|
var memoryStream = new MemoryStream(); |
|
await imageResponse.Content.CopyToAsync(memoryStream); |
|
return memoryStream; |
} |
catch (Exception exception) |
{ |
Log.Error($"Could not retrieve application image: {exception.Message}"); |
} |
} |
|
return new MemoryStream(); |
} |
|
var memoryStream = new MemoryStream(); |
#endregion |
|
await imageResponse.Content.CopyToAsync(memoryStream); |
private class GotifyConnectionApplication |
{ |
public GotifyApplication Application { get; } |
public Server Server { get; } |
|
memoryStream.Position = 0L; |
|
return memoryStream; |
public GotifyConnectionApplication(Server server, GotifyApplication application) |
{ |
Server = server; |
Application = application; |
} |
|
return new MemoryStream(); |
} |
|
#endregion |
private class GotifyConnectionData |
{ |
public byte[] Payload { get; } |
public Server Server { get; } |
|
public GotifyConnectionData(byte[] payload, Server server) |
{ |
Payload = payload; |
Server = server; |
} |
} |
} |
} |