/trunk/Winify/Gotify/GotifyConnection.cs |
@@ -1,19 +1,28 @@ |
using System; |
using System.Collections.Generic; |
using System.Diagnostics; |
using System.Drawing; |
using System.Globalization; |
using System.IO; |
using System.Linq; |
using System.Net; |
using System.Net.Http; |
using System.Net.Http.Headers; |
using System.Runtime.Caching; |
using System.Runtime.CompilerServices; |
using System.Security.Authentication; |
using System.Security.Cryptography.X509Certificates; |
using System.Text; |
using System.Threading; |
using System.Threading.Tasks; |
using System.Threading.Tasks.Dataflow; |
using System.Windows.Forms; |
using Newtonsoft.Json; |
using Serilog; |
using Servers; |
using WebSocketSharp; |
using WebSocketSharp.Net; |
using Winify.Utilities; |
using ErrorEventArgs = WebSocketSharp.ErrorEventArgs; |
using NetworkCredential = System.Net.NetworkCredential; |
|
@@ -45,7 +54,15 @@ |
private WebSocket _webSocketSharp; |
private readonly Configuration.Configuration _configuration; |
private Task _initTask; |
private IDisposable _tplRetrievePastMessagesLink; |
private IDisposable _tplWebSocketsBufferBlockTransformLink; |
private IDisposable _tplWebSocketsTransformActionLink; |
private IDisposable _tplWebSocketsTransformActionNullLink; |
private readonly BufferBlock<byte[]> _webSocketMessageBufferBlock; |
private readonly Stopwatch _webSocketsClientPingStopWatch; |
private readonly ScheduledContinuation _webSocketsServerResponseScheduledContinuation; |
|
private readonly MemoryCache _applicationImageCache; |
#endregion |
|
#region Constructors, Destructors and Finalizers |
@@ -52,6 +69,86 @@ |
|
private GotifyConnection() |
{ |
_applicationImageCache = new MemoryCache("GotifyApplicationImageCache"); |
_webSocketsServerResponseScheduledContinuation = new ScheduledContinuation(); |
_webSocketsClientPingStopWatch = new Stopwatch(); |
|
_webSocketMessageBufferBlock = new BufferBlock<byte[]>(new DataflowBlockOptions { CancellationToken = _cancellationToken }); |
var webSocketTransformBlock = new TransformBlock<byte[], GotifyMessage>(bytes => |
{ |
if (bytes.Length == 0) |
{ |
return null; |
} |
|
var message = Encoding.UTF8.GetString(bytes, 0, bytes.Length); |
|
GotifyMessage gotifyNotification; |
|
try |
{ |
gotifyNotification = JsonConvert.DeserializeObject<GotifyMessage>(message); |
} |
catch (JsonSerializationException exception) |
{ |
Log.Warning($"Could not deserialize notification: {exception.Message}"); |
|
return null; |
} |
|
if (gotifyNotification == null) |
{ |
Log.Warning($"Could not deserialize gotify notification: {message}"); |
|
return null; |
} |
|
return gotifyNotification; |
|
}, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken }); |
|
var webSocketActionBlock = new ActionBlock<GotifyMessage>(async message => |
{ |
message.Server = _server; |
|
var cachedImage = _applicationImageCache.Get($"{message.AppId}"); |
if (cachedImage is Image applicationImage) |
{ |
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(message, applicationImage)); |
return; |
} |
|
using (var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken)) |
{ |
if (imageStream == null || imageStream.Length == 0) |
{ |
Log.Warning("Could not find any application image for notification"); |
return; |
} |
|
var image = Image.FromStream(imageStream); |
|
_applicationImageCache.Add($"{message.AppId}", image.Clone(), |
new CacheItemPolicy |
{ |
SlidingExpiration = TimeSpan.FromHours(1) |
}); |
|
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(message, image)); |
} |
|
Log.Debug($"Notification message received: {message.Message}"); |
|
}, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken }); |
|
_tplWebSocketsBufferBlockTransformLink = _webSocketMessageBufferBlock.LinkTo(webSocketTransformBlock, |
new DataflowLinkOptions { PropagateCompletion = true }); |
_tplWebSocketsTransformActionLink = webSocketTransformBlock.LinkTo(webSocketActionBlock, |
new DataflowLinkOptions { PropagateCompletion = true }, message => message != null); |
_tplWebSocketsTransformActionNullLink = webSocketTransformBlock.LinkTo(DataflowBlock.NullTarget<GotifyMessage>(), |
new DataflowLinkOptions() { PropagateCompletion = true }); |
} |
|
public GotifyConnection(Server server, Configuration.Configuration configuration) : this() |
@@ -119,6 +216,30 @@ |
_cancellationTokenSource = null; |
} |
|
if (_tplWebSocketsBufferBlockTransformLink != null) |
{ |
_tplWebSocketsBufferBlockTransformLink.Dispose(); |
_tplWebSocketsBufferBlockTransformLink = null; |
} |
|
if (_tplWebSocketsTransformActionLink != null) |
{ |
_tplWebSocketsTransformActionLink.Dispose(); |
_tplWebSocketsTransformActionLink = null; |
} |
|
if (_tplWebSocketsTransformActionNullLink != null) |
{ |
_tplWebSocketsTransformActionNullLink.Dispose(); |
_tplWebSocketsTransformActionNullLink = null; |
} |
|
if (_tplRetrievePastMessagesLink != null) |
{ |
_tplRetrievePastMessagesLink.Dispose(); |
_tplRetrievePastMessagesLink = null; |
} |
|
if (_webSocketSharp != null) |
{ |
_webSocketSharp.Close(); |
@@ -154,12 +275,14 @@ |
_initTask = RetrievePastMessages(_cancellationToken); |
} |
|
_runTask = Run(_cancellationToken); |
_runTask = HeartBeat(_cancellationToken); |
} |
|
private void Connect() |
{ |
_webSocketSharp = new WebSocket(_webSocketsUri.AbsoluteUri); |
_webSocketSharp.EmitOnPing = true; |
_webSocketSharp.WaitTime = TimeSpan.FromMinutes(1); |
_webSocketSharp.SslConfiguration = new ClientSslConfiguration(_webSocketsUri.Host, |
new X509CertificateCollection(new X509Certificate[] { }), SslProtocols.Tls12, false); |
if (_configuration.Proxy.Enable) |
@@ -195,8 +318,15 @@ |
private void WebSocketSharp_OnOpen(object sender, EventArgs e) |
{ |
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} is now open"); |
|
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken); |
} |
|
private void OnUnresponsiveServer() |
{ |
Log.Warning($"Server {_server} has not responded in a long while..."); |
} |
|
private async void WebSocketSharp_OnError(object sender, ErrorEventArgs e) |
{ |
Log.Error( |
@@ -210,6 +340,7 @@ |
} |
|
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken); |
|
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
|
Connect(); |
@@ -217,64 +348,16 @@ |
|
private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e) |
{ |
if (e.RawData.Length == 0) |
if (e.IsPing) |
{ |
Log.Warning("Empty message received from server"); |
return; |
} |
Log.Information($"Server {_server} sent PING message"); |
|
var message = Encoding.UTF8.GetString(e.RawData, 0, e.RawData.Length); |
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken); |
|
GotifyMessage gotifyNotification; |
|
try |
{ |
gotifyNotification = JsonConvert.DeserializeObject<GotifyMessage>(message); |
} |
catch (JsonSerializationException exception) |
{ |
Log.Warning($"Could not deserialize notification: {exception.Message}"); |
return; |
} |
|
if (gotifyNotification == null) |
{ |
Log.Warning($"Could not deserialize gotify notification: {message}"); |
|
return; |
} |
|
gotifyNotification.Server = _server; |
|
var applicationUriBuilder = new UriBuilder(_httpUri); |
try |
{ |
applicationUriBuilder.Path = Path.Combine(applicationUriBuilder.Path, "application"); |
} |
catch (ArgumentException exception) |
{ |
Log.Warning("Could not build an URI to an application"); |
|
return; |
} |
|
using (var imageStream = |
await RetrieveGotifyApplicationImage(gotifyNotification.AppId, applicationUriBuilder.Uri, |
_cancellationToken)) |
{ |
if (imageStream == null) |
{ |
Log.Warning("Could not find any application image for notification"); |
return; |
} |
|
var image = Image.FromStream(imageStream); |
|
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(gotifyNotification, image)); |
} |
|
Log.Debug($"Notification message received: {gotifyNotification.Message}"); |
await _webSocketMessageBufferBlock.SendAsync(e.RawData, _cancellationToken); |
} |
|
public void Stop() |
@@ -291,7 +374,9 @@ |
private async Task RetrievePastMessages(CancellationToken cancellationToken) |
{ |
var messageUriBuilder = new UriBuilder(_httpUri); |
foreach (var application in await RetrieveGotifyApplications(cancellationToken)) |
|
var gotifyApplicationBufferBlock = new BufferBlock<GotifyApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken }); |
var gotifyApplicationActionBlock = new ActionBlock<GotifyApplication>(async application => |
{ |
try |
{ |
@@ -302,12 +387,22 @@ |
{ |
Log.Error($"No application URL could be built for {_server.Url} due to {exception.Message}"); |
|
continue; |
return; |
} |
|
var messagesResponse = await _httpClient.GetAsync(messageUriBuilder.Uri, cancellationToken); |
HttpResponseMessage messagesResponse; |
try |
{ |
messagesResponse = await _httpClient.GetAsync(messageUriBuilder.Uri, cancellationToken); |
} |
catch (Exception exception) |
{ |
Log.Error($"Could not get application {application.Id} due to {exception.Message}"); |
|
return; |
} |
|
|
var messages = await messagesResponse.Content.ReadAsStringAsync(); |
|
GotifyMessageQuery gotifyMessageQuery; |
@@ -320,54 +415,75 @@ |
{ |
Log.Warning($"Could not deserialize the message response: {exception.Message}"); |
|
continue; |
return; |
} |
|
var applicationUriBuilder = new UriBuilder(_httpUri); |
try |
foreach (var message in gotifyMessageQuery.Messages.Where(message => message.Date >= DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours))) |
{ |
applicationUriBuilder.Path = Path.Combine(applicationUriBuilder.Path, "application"); |
} |
catch (ArgumentException exception) |
{ |
Log.Warning($"Could not build an URI to an application: {exception}"); |
message.Server = _server; |
|
return; |
} |
var cachedImage = _applicationImageCache.Get($"{message.AppId}"); |
if (cachedImage is Image applicationImage) |
{ |
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(message, applicationImage)); |
return; |
} |
|
foreach (var message in gotifyMessageQuery.Messages) |
{ |
if (message.Date < DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours)) |
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId, _cancellationToken); |
if (imageStream == null || imageStream.Length == 0) |
{ |
Log.Warning("Could not find any application image for notification"); |
continue; |
} |
|
message.Server = _server; |
var image = Image.FromStream(imageStream); |
|
using (var imageStream = |
await RetrieveGotifyApplicationImage(message.AppId, applicationUriBuilder.Uri, |
_cancellationToken)) |
{ |
if (imageStream == null) |
_applicationImageCache.Add($"{message.AppId}", image.Clone(), |
new CacheItemPolicy |
{ |
Log.Warning("Could not find any application image for notification"); |
return; |
} |
SlidingExpiration = TimeSpan.FromHours(1) |
}); |
|
var image = Image.FromStream(imageStream); |
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(message, image)); |
} |
|
GotifyNotification?.Invoke(this, |
new GotifyNotificationEventArgs(message, image)); |
} |
} |
}, new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken }); |
|
gotifyApplicationBufferBlock.LinkTo(gotifyApplicationActionBlock, |
new DataflowLinkOptions { PropagateCompletion = true }); |
|
await foreach (var application in RetrieveGotifyApplications(cancellationToken)) |
{ |
await gotifyApplicationBufferBlock.SendAsync(application, cancellationToken); |
} |
|
gotifyApplicationBufferBlock.Complete(); |
await gotifyApplicationActionBlock.Completion; |
|
} |
|
private async Task Run(CancellationToken cancellationToken) |
private async Task HeartBeat(CancellationToken cancellationToken) |
{ |
try |
{ |
do |
{ |
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); |
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); |
|
_webSocketsClientPingStopWatch.Restart(); |
if (!_webSocketSharp.Ping()) |
{ |
Log.Warning($"Server {_server} did not respond to PING message."); |
continue; |
} |
|
var delta = _webSocketsClientPingStopWatch.ElapsedMilliseconds; |
|
Log.Information($"PING response latency for {_server} is {delta}ms"); |
|
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken); |
} while (!cancellationToken.IsCancellationRequested); |
} |
catch (Exception exception) when (exception is OperationCanceledException || |
@@ -376,11 +492,11 @@ |
} |
catch (Exception exception) |
{ |
Log.Warning(exception, "Failure running connection loop"); |
Log.Warning(exception, $"Heartbeat for server {_server} has failed due to {exception.Message}"); |
} |
} |
|
private async Task<GotifyApplication[]> RetrieveGotifyApplications(CancellationToken cancellationToken) |
private async IAsyncEnumerable<GotifyApplication> RetrieveGotifyApplications([EnumeratorCancellation] CancellationToken cancellationToken) |
{ |
var applicationsUriBuilder = new UriBuilder(_httpUri); |
try |
@@ -390,35 +506,25 @@ |
catch (ArgumentException exception) |
{ |
Log.Error($"No application URL could be built for {_server.Url} due to {exception}"); |
|
yield break; |
} |
|
var applicationsResponse = await _httpClient.GetAsync(applicationsUriBuilder.Uri, cancellationToken); |
HttpResponseMessage applicationsResponse; |
|
var applications = await applicationsResponse.Content.ReadAsStringAsync(); |
|
GotifyApplication[] gotifyApplications; |
try |
{ |
gotifyApplications = |
JsonConvert.DeserializeObject<GotifyApplication[]>(applications); |
applicationsResponse = await _httpClient.GetAsync(applicationsUriBuilder.Uri, cancellationToken); |
} |
catch (JsonSerializationException exception) |
catch (Exception exception) |
{ |
Log.Warning($"Could not deserialize the list of applications from the server: {exception}"); |
Log.Error($"Could not retrieve applications: {exception.Message}"); |
|
return null; |
yield break; |
} |
|
return gotifyApplications; |
} |
var applications = await applicationsResponse.Content.ReadAsStringAsync(); |
|
private async Task<Stream> RetrieveGotifyApplicationImage(int appId, Uri applicationUri, |
CancellationToken cancellationToken) |
{ |
var applicationResponse = await _httpClient.GetAsync(applicationUri, cancellationToken); |
|
var applications = await applicationResponse.Content.ReadAsStringAsync(); |
|
GotifyApplication[] gotifyApplications; |
try |
{ |
@@ -427,32 +533,52 @@ |
} |
catch (JsonSerializationException exception) |
{ |
Log.Warning($"Could not deserialize the list of applications from the server: {exception.Message}"); |
Log.Warning($"Could not deserialize the list of applications from the server: {exception}"); |
|
return null; |
yield break; |
} |
|
foreach (var application in gotifyApplications) |
{ |
yield return application; |
} |
} |
|
private async Task<Stream> RetrieveGotifyApplicationImage(int appId, CancellationToken cancellationToken) |
{ |
await foreach (var application in RetrieveGotifyApplications(cancellationToken)) |
{ |
if (application.Id != appId) continue; |
|
if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute, |
out var applicationImageUri)) |
if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute, out var applicationImageUri)) |
{ |
Log.Warning("Could not build URL path to application icon"); |
continue; |
} |
|
var imageResponse = await _httpClient.GetAsync(applicationImageUri, cancellationToken); |
HttpResponseMessage imageResponse; |
|
try |
{ |
imageResponse = await _httpClient.GetAsync(applicationImageUri, cancellationToken); |
} |
catch (Exception exception) |
{ |
Log.Error($"Could not retrieve application image: {exception.Message}"); |
|
return new MemoryStream(); |
} |
|
var memoryStream = new MemoryStream(); |
|
await imageResponse.Content.CopyToAsync(memoryStream); |
|
memoryStream.Position = 0L; |
|
return memoryStream; |
} |
|
return null; |
return new MemoryStream(); |
} |
|
#endregion |