Winify

Subversion Repositories:
Compare Path: Rev
With Path: Rev
?path1? @ HEAD  →  ?path2? @ 1
/trunk/Winify/Gotify/GotifyConnection.cs
@@ -1,6 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Drawing;
using System.IO;
@@ -7,21 +5,12 @@
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Newtonsoft.Json;
using Serilog;
using Servers;
using WebSocketSharp;
using WebSocketSharp.Net;
using Winify.Utilities;
using ErrorEventArgs = WebSocketSharp.ErrorEventArgs;
using NetworkCredential = System.Net.NetworkCredential;
using ClientWebSocket = System.Net.WebSockets.Managed.ClientWebSocket;
 
namespace Winify.Gotify
{
@@ -29,177 +18,40 @@
{
#region Public Events & Delegates
 
public event EventHandler<GotifyMessageEventArgs> GotifyMessage;
public event EventHandler<GotifyNotificationEventArgs> GotifyNotification;
 
#endregion
 
#region Private Delegates, Events, Enums, Properties, Indexers and Fields
 
private readonly Server _server;
 
private CancellationToken _cancellationToken;
 
private CancellationTokenSource _cancellationTokenSource;
 
private Task _heartBeatTask;
 
private HttpClient _httpClient;
 
private readonly Uri _webSocketsUri;
private Task _runTask;
 
private readonly Uri _httpUri;
private WebSocket _webSocketSharp;
private readonly Configuration.Configuration _configuration;
private IDisposable _tplRetrievePastMessagesLink;
private IDisposable _tplWebSocketsBufferBlockTransformLink;
private readonly BufferBlock<GotifyConnectionData> _webSocketMessageBufferBlock;
private readonly Stopwatch _webSocketsClientPingStopWatch;
private readonly ScheduledContinuation _webSocketsServerResponseScheduledContinuation;
private Task _retrievePastMessagesTask;
private static JsonSerializer _jsonSerializer;
private readonly CancellationToken _programCancellationToken;
private CancellationTokenSource _localCancellationTokenSource;
private CancellationToken _localCancellationToken;
private ClientWebSocket _webSocketClient;
 
#endregion
 
#region Constructors, Destructors and Finalizers
 
private GotifyConnection()
{
_jsonSerializer = new JsonSerializer();
_webSocketsServerResponseScheduledContinuation = new ScheduledContinuation();
_webSocketsClientPingStopWatch = new Stopwatch();
 
_webSocketMessageBufferBlock = new BufferBlock<GotifyConnectionData>(
new DataflowBlockOptions
{
CancellationToken = _cancellationToken
});
 
var webSocketActionBlock = new ActionBlock<GotifyConnectionData>(async gotifyConnectionData =>
{
try
{
using var memoryStream = new MemoryStream(gotifyConnectionData.Payload);
using var streamReader = new StreamReader(memoryStream);
using var jsonTextReader = new JsonTextReader(streamReader);
 
var gotifyMessage = _jsonSerializer.Deserialize<GotifyMessage>(jsonTextReader) ?? throw new ArgumentNullException();
 
gotifyMessage.Server = gotifyConnectionData.Server;
 
using var imageStream = await RetrieveGotifyApplicationImage(gotifyMessage.AppId);
 
if (imageStream == null || imageStream.Length == 0)
{
Log.Warning("Could not find any application image for notification.");
 
return;
}
 
var image = new Bitmap(imageStream);
 
GotifyMessage?.Invoke(this,
new GotifyMessageEventArgs(gotifyMessage, image));
 
Log.Debug($"Notification message received: {gotifyMessage.Message}");
}
catch (JsonSerializationException exception)
{
Log.Warning(exception, "Could not deserialize notification message.");
}
catch (Exception exception)
{
Log.Warning(exception, "Generic failure.");
}
 
}, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken });
 
_tplWebSocketsBufferBlockTransformLink = _webSocketMessageBufferBlock.LinkTo(webSocketActionBlock,
new DataflowLinkOptions { PropagateCompletion = true });
}
 
public GotifyConnection(Server server, Configuration.Configuration configuration, CancellationToken cancellationToken) : this()
{
_server = server;
_configuration = configuration;
_programCancellationToken = cancellationToken;
 
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
var httpClientHandler = new HttpClientHandler
{
// mono does not implement this
//SslProtocols = SslProtocols.Tls12
};
 
_httpClient = new HttpClient(httpClientHandler);
if (_configuration.IgnoreSelfSignedCertificates)
httpClientHandler.ServerCertificateCustomValidationCallback =
(httpRequestMessage, cert, cetChain, policyErrors) => true;
 
if (_configuration.Proxy.Enable)
httpClientHandler.Proxy = new WebProxy(_configuration.Proxy.Url, false, new string[] { },
new NetworkCredential(_configuration.Proxy.Username, _configuration.Proxy.Password));
 
_httpClient = new HttpClient(httpClientHandler);
if (!string.IsNullOrEmpty(_server.Username) && !string.IsNullOrEmpty(_server.Password))
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic",
Convert.ToBase64String(Encoding.Default.GetBytes($"{_server.Username}:{_server.Password}")));
 
if (!Uri.TryCreate(_server.Url, UriKind.Absolute, out _httpUri))
{
Log.Error($"No HTTP URL could be built out of the supplied server URI {_server.Url}");
return;
}
 
// Build the web sockets URI.
var webSocketsUriBuilder = new UriBuilder(_httpUri);
switch (webSocketsUriBuilder.Scheme.ToUpperInvariant())
{
case "HTTP":
webSocketsUriBuilder.Scheme = "ws";
break;
case "HTTPS":
webSocketsUriBuilder.Scheme = "wss";
break;
}
 
if (!Uri.TryCreate(webSocketsUriBuilder.Uri, "stream", out var combinedUri))
{
Log.Error($"No WebSockets URL could be built from the provided URL {_server.Url}.");
}
 
_webSocketsUri = combinedUri;
}
 
public void Dispose()
{
if (_localCancellationTokenSource != null)
if (_cancellationTokenSource != null)
{
_localCancellationTokenSource.Dispose();
_localCancellationTokenSource = null;
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;
}
 
if (_tplWebSocketsBufferBlockTransformLink != null)
if (_webSocketClient != null)
{
_tplWebSocketsBufferBlockTransformLink.Dispose();
_tplWebSocketsBufferBlockTransformLink = null;
_webSocketClient.Dispose();
_webSocketClient = null;
}
 
if (_tplRetrievePastMessagesLink != null)
{
_tplRetrievePastMessagesLink.Dispose();
_tplRetrievePastMessagesLink = null;
}
 
if (_webSocketSharp != null)
{
_webSocketSharp.Close();
_webSocketSharp = null;
}
 
if (_httpClient != null)
{
_httpClient.Dispose();
@@ -211,349 +63,141 @@
 
#region Public Methods
 
public async Task Stop()
public void Start(string username, string password, string host, string port)
{
_localCancellationTokenSource.Cancel();
 
if (_heartBeatTask != null)
if (!Uri.TryCreate($"ws://{host}:{port}/stream", UriKind.RelativeOrAbsolute, out var webSocketsUri))
{
await _heartBeatTask;
return;
}
 
if (_retrievePastMessagesTask != null)
if (!Uri.TryCreate($"http://{host}:{port}/", UriKind.RelativeOrAbsolute, out var httpUri))
{
await _retrievePastMessagesTask;
}
 
if (_webSocketSharp != null)
{
_webSocketSharp.OnMessage -= WebSocketSharp_OnMessage;
_webSocketSharp.OnError -= WebSocketSharp_OnError;
_webSocketSharp.OnOpen -= WebSocketSharp_OnOpen;
_webSocketSharp.OnClose -= WebSocketSharp_OnClose;
 
_webSocketSharp.Close();
_webSocketSharp = null;
}
}
 
public void Start()
{
if (_webSocketsUri == null || _httpUri == null)
{
Log.Error("Could not start connection to server due to unreadable URLs");
return;
}
 
 
_localCancellationTokenSource = new CancellationTokenSource();
_localCancellationToken = _localCancellationTokenSource.Token;
 
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(new [] { _programCancellationToken, _localCancellationToken });
_cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = _cancellationTokenSource.Token;
 
if (_webSocketSharp != null)
var httpClientHandler = new HttpClientHandler
{
return;
}
AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate
};
 
_webSocketSharp = new WebSocket(_webSocketsUri.AbsoluteUri);
_webSocketSharp.EmitOnPing = true;
_webSocketSharp.WaitTime = TimeSpan.FromMinutes(1);
_webSocketSharp.SslConfiguration = new ClientSslConfiguration(_webSocketsUri.Host,
new X509CertificateCollection(new X509Certificate[] { }), SslProtocols.Tls12, false);
 
if (_configuration.Proxy.Enable)
_httpClient = new HttpClient(httpClientHandler)
{
if (!string.IsNullOrEmpty(_configuration.Proxy.Url))
{
_webSocketSharp.SetProxy(_configuration.Proxy.Url, _configuration.Proxy.Username, _configuration.Proxy.Password);
}
}
 
if (!string.IsNullOrEmpty(_server.Username) && !string.IsNullOrEmpty(_server.Password))
{
_webSocketSharp.SetCredentials(_server.Username, _server.Password, true);
}
 
if (_configuration.IgnoreSelfSignedCertificates)
{
_webSocketSharp.SslConfiguration.ServerCertificateValidationCallback +=
(sender, certificate, chain, errors) => true;
}
 
_webSocketSharp.Log.Output = (logData, s) =>
{
Log.Information($"WebSockets low level logging reported: {logData.Message}");
BaseAddress = httpUri
};
var auth = Convert.ToBase64String(Encoding.Default.GetBytes($"{username}:{password}"));
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", auth);
 
_webSocketSharp.OnMessage += WebSocketSharp_OnMessage;
_webSocketSharp.OnError += WebSocketSharp_OnError;
_webSocketSharp.OnOpen += WebSocketSharp_OnOpen;
_webSocketSharp.OnClose += WebSocketSharp_OnClose;
 
_webSocketSharp.Connect();
_heartBeatTask = HeartBeat(_cancellationToken);
 
if (_configuration.RetrievePastNotificationHours != 0)
{
_retrievePastMessagesTask = RetrievePastMessages(_cancellationToken);
}
_runTask = Run(webSocketsUri, username, password, _cancellationToken);
}
 
private async void WebSocketSharp_OnClose(object sender, CloseEventArgs e)
public void Stop()
{
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} closed with reason {e.Reason}");
 
try
if (_cancellationTokenSource != null)
{
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken);
 
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
 
await Stop().ContinueWith(task => Start(), _programCancellationToken);
_cancellationTokenSource.Cancel();
}
catch (Exception exception)
{
Log.Error(exception, "Error restarting WebSockets connection on connection closed.");
}
}
 
private void WebSocketSharp_OnOpen(object sender, EventArgs e)
{
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} is now open");
#endregion
 
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken);
}
#region Private Methods
 
private async void OnServerNotResponding()
private async Task Run(Uri uri, string username, string password, CancellationToken cancellationToken)
{
Log.Warning($"Server {_server.Name} has not responded in a long while...");
 
try
{
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken);
do
{
try
{
_webSocketClient = new ClientWebSocket();
var auth = Convert.ToBase64String(Encoding.Default.GetBytes($"{username}:{password}"));
_webSocketClient.Options.SetRequestHeader("Authorization", $"Basic {auth}");
 
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
await _webSocketClient.ConnectAsync(uri, cancellationToken);
 
await Stop().ContinueWith(task => Start(), _programCancellationToken);
}
catch (Exception exception)
{
Log.Error(exception, "Error restarting WebSockets connection on connection closed.");
}
}
do
{
var payload = new ArraySegment<byte>(new byte[1024]);
 
private async void WebSocketSharp_OnError(object sender, ErrorEventArgs e)
{
Log.Error(
$"Connection to WebSockets server {_webSocketsUri.AbsoluteUri} terminated unexpectedly with message {e.Message}",
e.Exception);
await _webSocketClient.ReceiveAsync(payload, cancellationToken);
 
try
{
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken);
if (payload.Array == null || payload.Count == 0)
{
continue;
}
 
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
var message = Encoding.UTF8.GetString(payload.Array, 0, payload.Count);
 
await Stop().ContinueWith(task => Start(), _programCancellationToken);
}
catch (Exception exception)
{
Log.Error(exception, "Error restarting WebSockets connection on connection closed.");
}
}
var gotifyNotification = JsonConvert.DeserializeObject<GotifyNotification>(message);
if (gotifyNotification == null)
{
continue;
}
 
private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e)
{
if (e.IsPing)
{
Log.Information($"Server {_server.Name} sent PING message");
var applications = await _httpClient.GetStringAsync("application");
 
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken);
return;
}
var gotifyApplications = JsonConvert.DeserializeObject<GotifyApplication[]>(applications);
if (gotifyApplications == null)
{
continue;
}
 
await _webSocketMessageBufferBlock.SendAsync(new GotifyConnectionData(e.RawData, _server), _cancellationToken);
}
foreach (var application in gotifyApplications)
{
if (application.Id != gotifyNotification.AppId)
{
continue;
}
 
#endregion
var imageBytes = await _httpClient.GetByteArrayAsync(application.Image);
using (var memoryStream = new MemoryStream(imageBytes))
{
var image = Image.FromStream(memoryStream);
 
#region Private Methods
if (GotifyNotification != null)
{
GotifyNotification.Invoke(this,
new GotifyNotificationEventArgs(gotifyNotification, image));
}
}
 
private async Task RetrievePastMessages(CancellationToken cancellationToken)
{
var gotifyApplicationBufferBlock = new BufferBlock<GotifyConnectionApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken });
var gotifyApplicationActionBlock = new ActionBlock<GotifyConnectionApplication>(async gotifyConnectionApplication =>
{
if (!Uri.TryCreate(_httpUri, $"application/{gotifyConnectionApplication.Application.Id}/message", out var combinedUri))
{
Log.Error($"Could not get application message Uri {gotifyConnectionApplication.Application.Id}.");
 
return;
}
break;
}
 
try
{
using var messageStream = await _httpClient.GetStreamAsync(combinedUri);
using var streamReader = new StreamReader(messageStream);
using var jsonTextReader = new JsonTextReader(streamReader);
Debug.WriteLine($"{gotifyNotification.Message}");
} while (!cancellationToken.IsCancellationRequested);
 
var gotifyMessageQuery = _jsonSerializer.Deserialize<GotifyMessageQuery>(jsonTextReader) ??
throw new ArgumentNullException();
 
if (gotifyMessageQuery.Messages == null)
{
Log.Warning("Invalid application messages deserialized.");
 
return;
await _webSocketClient.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty,
CancellationToken.None);
}
 
foreach (var message in gotifyMessageQuery.Messages)
catch (WebSocketException)
{
if (message.Date < DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours))
// Reconnect
if (_webSocketClient != null)
{
continue;
_webSocketClient.Abort();
_webSocketClient.Dispose();
_webSocketClient = null;
}
 
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId);
if (imageStream == null || imageStream.Length == 0)
{
Log.Warning("Could not find any application image for notification.");
 
continue;
}
 
var image = new Bitmap(imageStream);
message.Server = gotifyConnectionApplication.Server;
 
GotifyMessage?.Invoke(this, new GotifyMessageEventArgs(message, image));
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
catch (HttpRequestException exception)
{
Log.Warning(exception, $"Could not get application {gotifyConnectionApplication.Application.Id}.");
}
catch (JsonSerializationException exception)
{
Log.Warning(exception,
$"Could not deserialize the message response for application {gotifyConnectionApplication.Application.Id}.");
}
catch (Exception exception)
{
Log.Warning(exception, "Generic failure.");
}
}, new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
 
using var _1 = gotifyApplicationBufferBlock.LinkTo(gotifyApplicationActionBlock,
new DataflowLinkOptions { PropagateCompletion = true });
 
var tasks = new ConcurrentBag<Task>();
foreach (var application in await RetrieveGotifyApplications())
{
var gotifyConnectionApplication = new GotifyConnectionApplication(_server, application);
 
tasks.Add(gotifyApplicationBufferBlock.SendAsync(gotifyConnectionApplication, cancellationToken));
}
 
await Task.WhenAll(tasks);
gotifyApplicationBufferBlock.Complete();
await gotifyApplicationActionBlock.Completion;
}
 
private async Task HeartBeat(CancellationToken cancellationToken)
{
try
{
do
{
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
 
_webSocketsClientPingStopWatch.Restart();
if (!_webSocketSharp.Ping())
{
Log.Warning($"Server {_server.Name} did not respond to PING message.");
continue;
}
 
var delta = _webSocketsClientPingStopWatch.ElapsedMilliseconds;
 
Log.Information($"PING response latency for {_server.Name} is {delta}ms");
 
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken);
 
} while (!cancellationToken.IsCancellationRequested);
}
catch (Exception exception) when (exception is OperationCanceledException ||
exception is ObjectDisposedException)
catch (Exception ex) when (ex is OperationCanceledException || ex is ObjectDisposedException)
{
}
catch (Exception exception)
catch (Exception ex)
{
Log.Warning(exception, $"Heartbeat for server {_server.Name} has failed.");
Debug.WriteLine($"Exception: {ex}");
}
}
 
private async Task<GotifyApplication[]> RetrieveGotifyApplications()
{
if (!Uri.TryCreate(_httpUri, "application", out var combinedUri))
{
Log.Error($"No application URL could be built for {_server.Url}.");
 
return Array.Empty<GotifyApplication>();
}
try
{
using var messageStream = await _httpClient.GetStreamAsync(combinedUri);
using var streamReader = new StreamReader(messageStream);
using var jsonTextReader = new JsonTextReader(streamReader);
 
return _jsonSerializer.Deserialize<GotifyApplication[]>(jsonTextReader);
}
catch (Exception exception)
{
Log.Warning(exception,"Could not deserialize the list of applications from the server.");
 
return Array.Empty<GotifyApplication>();
}
}
 
private async Task<Stream> RetrieveGotifyApplicationImage(int appId)
{
var memoryStream = new MemoryStream();
 
foreach (var application in await RetrieveGotifyApplications())
{
if (application.Id != appId)
{
continue;
}
 
if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute, out var applicationImageUri))
{
Log.Warning("Could not build URL path to application icon");
 
continue;
}
 
try
{
using var imageResponse = await _httpClient.GetStreamAsync(applicationImageUri);
 
await imageResponse.CopyToAsync(memoryStream);
 
return memoryStream;
}
catch (Exception exception)
{
Log.Error(exception,"Could not retrieve application image.");
}
}
 
return memoryStream;
}
 
#endregion
}
}