/trunk/Winify/Gotify/GotifyConnection.cs |
@@ -1,4 +1,6 @@ |
using System; |
using System.Collections.Concurrent; |
using System.Collections.Generic; |
using System.Diagnostics; |
using System.Drawing; |
using System.IO; |
@@ -5,12 +7,21 @@ |
using System.Net; |
using System.Net.Http; |
using System.Net.Http.Headers; |
using System.Net.WebSockets; |
using System.Runtime.CompilerServices; |
using System.Security.Authentication; |
using System.Security.Cryptography.X509Certificates; |
using System.Text; |
using System.Threading; |
using System.Threading.Tasks; |
using System.Threading.Tasks.Dataflow; |
using Newtonsoft.Json; |
using ClientWebSocket = System.Net.WebSockets.Managed.ClientWebSocket; |
using Serilog; |
using Servers; |
using WebSocketSharp; |
using WebSocketSharp.Net; |
using Winify.Utilities; |
using ErrorEventArgs = WebSocketSharp.ErrorEventArgs; |
using NetworkCredential = System.Net.NetworkCredential; |
|
namespace Winify.Gotify |
{ |
@@ -18,40 +29,177 @@ |
{ |
#region Public Events & Delegates |
|
public event EventHandler<GotifyNotificationEventArgs> GotifyNotification; |
public event EventHandler<GotifyMessageEventArgs> GotifyMessage; |
|
#endregion |
|
#region Private Delegates, Events, Enums, Properties, Indexers and Fields |
|
private readonly Server _server; |
|
private CancellationToken _cancellationToken; |
|
private CancellationTokenSource _cancellationTokenSource; |
|
private Task _heartBeatTask; |
|
private HttpClient _httpClient; |
|
private Task _runTask; |
private readonly Uri _webSocketsUri; |
|
private ClientWebSocket _webSocketClient; |
private readonly Uri _httpUri; |
private WebSocket _webSocketSharp; |
private readonly Configuration.Configuration _configuration; |
private IDisposable _tplRetrievePastMessagesLink; |
private IDisposable _tplWebSocketsBufferBlockTransformLink; |
private readonly BufferBlock<GotifyConnectionData> _webSocketMessageBufferBlock; |
private readonly Stopwatch _webSocketsClientPingStopWatch; |
private readonly ScheduledContinuation _webSocketsServerResponseScheduledContinuation; |
|
private Task _retrievePastMessagesTask; |
private static JsonSerializer _jsonSerializer; |
private readonly CancellationToken _programCancellationToken; |
private CancellationTokenSource _localCancellationTokenSource; |
private CancellationToken _localCancellationToken; |
|
#endregion |
|
#region Constructors, Destructors and Finalizers |
|
private GotifyConnection() |
{ |
_jsonSerializer = new JsonSerializer(); |
_webSocketsServerResponseScheduledContinuation = new ScheduledContinuation(); |
_webSocketsClientPingStopWatch = new Stopwatch(); |
|
_webSocketMessageBufferBlock = new BufferBlock<GotifyConnectionData>( |
new DataflowBlockOptions |
{ |
CancellationToken = _cancellationToken |
}); |
|
var webSocketActionBlock = new ActionBlock<GotifyConnectionData>(async gotifyConnectionData => |
{ |
try |
{ |
using var memoryStream = new MemoryStream(gotifyConnectionData.Payload); |
using var streamReader = new StreamReader(memoryStream); |
using var jsonTextReader = new JsonTextReader(streamReader); |
|
var gotifyMessage = _jsonSerializer.Deserialize<GotifyMessage>(jsonTextReader) ?? throw new ArgumentNullException(); |
|
gotifyMessage.Server = gotifyConnectionData.Server; |
|
using var imageStream = await RetrieveGotifyApplicationImage(gotifyMessage.AppId); |
|
if (imageStream == null || imageStream.Length == 0) |
{ |
Log.Warning("Could not find any application image for notification."); |
|
return; |
} |
|
var image = new Bitmap(imageStream); |
|
GotifyMessage?.Invoke(this, |
new GotifyMessageEventArgs(gotifyMessage, image)); |
|
Log.Debug($"Notification message received: {gotifyMessage.Message}"); |
} |
catch (JsonSerializationException exception) |
{ |
Log.Warning(exception, "Could not deserialize notification message."); |
} |
catch (Exception exception) |
{ |
Log.Warning(exception, "Generic failure."); |
} |
|
}, new ExecutionDataflowBlockOptions { CancellationToken = _cancellationToken }); |
|
_tplWebSocketsBufferBlockTransformLink = _webSocketMessageBufferBlock.LinkTo(webSocketActionBlock, |
new DataflowLinkOptions { PropagateCompletion = true }); |
} |
|
public GotifyConnection(Server server, Configuration.Configuration configuration, CancellationToken cancellationToken) : this() |
{ |
_server = server; |
_configuration = configuration; |
_programCancellationToken = cancellationToken; |
|
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12; |
var httpClientHandler = new HttpClientHandler |
{ |
// mono does not implement this |
//SslProtocols = SslProtocols.Tls12 |
}; |
|
_httpClient = new HttpClient(httpClientHandler); |
if (_configuration.IgnoreSelfSignedCertificates) |
httpClientHandler.ServerCertificateCustomValidationCallback = |
(httpRequestMessage, cert, cetChain, policyErrors) => true; |
|
if (_configuration.Proxy.Enable) |
httpClientHandler.Proxy = new WebProxy(_configuration.Proxy.Url, false, new string[] { }, |
new NetworkCredential(_configuration.Proxy.Username, _configuration.Proxy.Password)); |
|
_httpClient = new HttpClient(httpClientHandler); |
if (!string.IsNullOrEmpty(_server.Username) && !string.IsNullOrEmpty(_server.Password)) |
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", |
Convert.ToBase64String(Encoding.Default.GetBytes($"{_server.Username}:{_server.Password}"))); |
|
if (!Uri.TryCreate(_server.Url, UriKind.Absolute, out _httpUri)) |
{ |
Log.Error($"No HTTP URL could be built out of the supplied server URI {_server.Url}"); |
return; |
} |
|
// Build the web sockets URI. |
var webSocketsUriBuilder = new UriBuilder(_httpUri); |
switch (webSocketsUriBuilder.Scheme.ToUpperInvariant()) |
{ |
case "HTTP": |
webSocketsUriBuilder.Scheme = "ws"; |
break; |
case "HTTPS": |
webSocketsUriBuilder.Scheme = "wss"; |
break; |
} |
|
if (!Uri.TryCreate(webSocketsUriBuilder.Uri, "stream", out var combinedUri)) |
{ |
Log.Error($"No WebSockets URL could be built from the provided URL {_server.Url}."); |
} |
|
_webSocketsUri = combinedUri; |
} |
|
public void Dispose() |
{ |
if (_cancellationTokenSource != null) |
if (_localCancellationTokenSource != null) |
{ |
_cancellationTokenSource.Dispose(); |
_cancellationTokenSource = null; |
_localCancellationTokenSource.Dispose(); |
_localCancellationTokenSource = null; |
} |
|
if (_webSocketClient != null) |
if (_tplWebSocketsBufferBlockTransformLink != null) |
{ |
_webSocketClient.Dispose(); |
_webSocketClient = null; |
_tplWebSocketsBufferBlockTransformLink.Dispose(); |
_tplWebSocketsBufferBlockTransformLink = null; |
} |
|
if (_tplRetrievePastMessagesLink != null) |
{ |
_tplRetrievePastMessagesLink.Dispose(); |
_tplRetrievePastMessagesLink = null; |
} |
|
if (_webSocketSharp != null) |
{ |
_webSocketSharp.Close(); |
_webSocketSharp = null; |
} |
|
if (_httpClient != null) |
{ |
_httpClient.Dispose(); |
@@ -63,141 +211,349 @@ |
|
#region Public Methods |
|
public void Start(string username, string password, string host, string port) |
public async Task Stop() |
{ |
if (!Uri.TryCreate($"ws://{host}:{port}/stream", UriKind.RelativeOrAbsolute, out var webSocketsUri)) |
_localCancellationTokenSource.Cancel(); |
|
if (_heartBeatTask != null) |
{ |
return; |
await _heartBeatTask; |
} |
|
if (!Uri.TryCreate($"http://{host}:{port}/", UriKind.RelativeOrAbsolute, out var httpUri)) |
if (_retrievePastMessagesTask != null) |
{ |
await _retrievePastMessagesTask; |
} |
|
if (_webSocketSharp != null) |
{ |
_webSocketSharp.OnMessage -= WebSocketSharp_OnMessage; |
_webSocketSharp.OnError -= WebSocketSharp_OnError; |
_webSocketSharp.OnOpen -= WebSocketSharp_OnOpen; |
_webSocketSharp.OnClose -= WebSocketSharp_OnClose; |
|
_webSocketSharp.Close(); |
_webSocketSharp = null; |
} |
} |
|
public void Start() |
{ |
if (_webSocketsUri == null || _httpUri == null) |
{ |
Log.Error("Could not start connection to server due to unreadable URLs"); |
return; |
} |
|
_cancellationTokenSource = new CancellationTokenSource(); |
|
_localCancellationTokenSource = new CancellationTokenSource(); |
_localCancellationToken = _localCancellationTokenSource.Token; |
|
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(new [] { _programCancellationToken, _localCancellationToken }); |
_cancellationToken = _cancellationTokenSource.Token; |
|
var httpClientHandler = new HttpClientHandler |
if (_webSocketSharp != null) |
{ |
AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate |
}; |
return; |
} |
|
_httpClient = new HttpClient(httpClientHandler) |
_webSocketSharp = new WebSocket(_webSocketsUri.AbsoluteUri); |
_webSocketSharp.EmitOnPing = true; |
_webSocketSharp.WaitTime = TimeSpan.FromMinutes(1); |
_webSocketSharp.SslConfiguration = new ClientSslConfiguration(_webSocketsUri.Host, |
new X509CertificateCollection(new X509Certificate[] { }), SslProtocols.Tls12, false); |
|
if (_configuration.Proxy.Enable) |
{ |
BaseAddress = httpUri |
if (!string.IsNullOrEmpty(_configuration.Proxy.Url)) |
{ |
_webSocketSharp.SetProxy(_configuration.Proxy.Url, _configuration.Proxy.Username, _configuration.Proxy.Password); |
} |
} |
|
if (!string.IsNullOrEmpty(_server.Username) && !string.IsNullOrEmpty(_server.Password)) |
{ |
_webSocketSharp.SetCredentials(_server.Username, _server.Password, true); |
} |
|
if (_configuration.IgnoreSelfSignedCertificates) |
{ |
_webSocketSharp.SslConfiguration.ServerCertificateValidationCallback += |
(sender, certificate, chain, errors) => true; |
} |
|
_webSocketSharp.Log.Output = (logData, s) => |
{ |
Log.Information($"WebSockets low level logging reported: {logData.Message}"); |
}; |
var auth = Convert.ToBase64String(Encoding.Default.GetBytes($"{username}:{password}")); |
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", auth); |
|
_runTask = Run(webSocketsUri, username, password, _cancellationToken); |
_webSocketSharp.OnMessage += WebSocketSharp_OnMessage; |
_webSocketSharp.OnError += WebSocketSharp_OnError; |
_webSocketSharp.OnOpen += WebSocketSharp_OnOpen; |
_webSocketSharp.OnClose += WebSocketSharp_OnClose; |
|
_webSocketSharp.Connect(); |
_heartBeatTask = HeartBeat(_cancellationToken); |
|
if (_configuration.RetrievePastNotificationHours != 0) |
{ |
_retrievePastMessagesTask = RetrievePastMessages(_cancellationToken); |
} |
} |
|
public void Stop() |
private async void WebSocketSharp_OnClose(object sender, CloseEventArgs e) |
{ |
if (_cancellationTokenSource != null) |
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} closed with reason {e.Reason}"); |
|
try |
{ |
_cancellationTokenSource.Cancel(); |
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken); |
|
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
|
await Stop().ContinueWith(task => Start(), _programCancellationToken); |
} |
catch (Exception exception) |
{ |
Log.Error(exception, "Error restarting WebSockets connection on connection closed."); |
} |
} |
|
private void WebSocketSharp_OnOpen(object sender, EventArgs e) |
{ |
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} is now open"); |
|
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken); |
} |
|
private async void OnServerNotResponding() |
{ |
Log.Warning($"Server {_server.Name} has not responded in a long while..."); |
|
try |
{ |
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken); |
|
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
|
await Stop().ContinueWith(task => Start(), _programCancellationToken); |
} |
catch (Exception exception) |
{ |
Log.Error(exception, "Error restarting WebSockets connection on connection closed."); |
} |
} |
|
private async void WebSocketSharp_OnError(object sender, ErrorEventArgs e) |
{ |
Log.Error( |
$"Connection to WebSockets server {_webSocketsUri.AbsoluteUri} terminated unexpectedly with message {e.Message}", |
e.Exception); |
|
try |
{ |
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken); |
|
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
|
await Stop().ContinueWith(task => Start(), _programCancellationToken); |
} |
catch (Exception exception) |
{ |
Log.Error(exception, "Error restarting WebSockets connection on connection closed."); |
} |
} |
|
private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e) |
{ |
if (e.IsPing) |
{ |
Log.Information($"Server {_server.Name} sent PING message"); |
|
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken); |
return; |
} |
|
await _webSocketMessageBufferBlock.SendAsync(new GotifyConnectionData(e.RawData, _server), _cancellationToken); |
} |
|
#endregion |
|
#region Private Methods |
|
private async Task Run(Uri uri, string username, string password, CancellationToken cancellationToken) |
private async Task RetrievePastMessages(CancellationToken cancellationToken) |
{ |
try |
var gotifyApplicationBufferBlock = new BufferBlock<GotifyConnectionApplication>(new DataflowBlockOptions { CancellationToken = cancellationToken }); |
var gotifyApplicationActionBlock = new ActionBlock<GotifyConnectionApplication>(async gotifyConnectionApplication => |
{ |
do |
if (!Uri.TryCreate(_httpUri, $"application/{gotifyConnectionApplication.Application.Id}/message", out var combinedUri)) |
{ |
Log.Error($"Could not get application message Uri {gotifyConnectionApplication.Application.Id}."); |
|
return; |
} |
|
try |
{ |
_webSocketClient = new ClientWebSocket(); |
var auth = Convert.ToBase64String(Encoding.Default.GetBytes($"{username}:{password}")); |
_webSocketClient.Options.SetRequestHeader("Authorization", $"Basic {auth}"); |
using var messageStream = await _httpClient.GetStreamAsync(combinedUri); |
using var streamReader = new StreamReader(messageStream); |
using var jsonTextReader = new JsonTextReader(streamReader); |
|
await _webSocketClient.ConnectAsync(uri, cancellationToken); |
var gotifyMessageQuery = _jsonSerializer.Deserialize<GotifyMessageQuery>(jsonTextReader) ?? |
throw new ArgumentNullException(); |
|
do |
if (gotifyMessageQuery.Messages == null) |
{ |
var payload = new ArraySegment<byte>(new byte[1024]); |
Log.Warning("Invalid application messages deserialized."); |
|
await _webSocketClient.ReceiveAsync(payload, cancellationToken); |
return; |
} |
|
if (payload.Array == null || payload.Count == 0) |
foreach (var message in gotifyMessageQuery.Messages) |
{ |
if (message.Date < DateTime.Now - TimeSpan.FromHours(_configuration.RetrievePastNotificationHours)) |
{ |
continue; |
} |
|
var message = Encoding.UTF8.GetString(payload.Array, 0, payload.Count); |
using var imageStream = await RetrieveGotifyApplicationImage(message.AppId); |
if (imageStream == null || imageStream.Length == 0) |
{ |
Log.Warning("Could not find any application image for notification."); |
|
var gotifyNotification = JsonConvert.DeserializeObject<GotifyNotification>(message); |
if (gotifyNotification == null) |
{ |
continue; |
} |
|
var applications = await _httpClient.GetStringAsync("application"); |
var image = new Bitmap(imageStream); |
message.Server = gotifyConnectionApplication.Server; |
|
var gotifyApplications = JsonConvert.DeserializeObject<GotifyApplication[]>(applications); |
if (gotifyApplications == null) |
GotifyMessage?.Invoke(this, new GotifyMessageEventArgs(message, image)); |
} |
} |
catch (HttpRequestException exception) |
{ |
continue; |
Log.Warning(exception, $"Could not get application {gotifyConnectionApplication.Application.Id}."); |
} |
catch (JsonSerializationException exception) |
{ |
Log.Warning(exception, |
$"Could not deserialize the message response for application {gotifyConnectionApplication.Application.Id}."); |
} |
catch (Exception exception) |
{ |
Log.Warning(exception, "Generic failure."); |
} |
}, new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken }); |
|
foreach (var application in gotifyApplications) |
using var _1 = gotifyApplicationBufferBlock.LinkTo(gotifyApplicationActionBlock, |
new DataflowLinkOptions { PropagateCompletion = true }); |
|
var tasks = new ConcurrentBag<Task>(); |
foreach (var application in await RetrieveGotifyApplications()) |
{ |
if (application.Id != gotifyNotification.AppId) |
var gotifyConnectionApplication = new GotifyConnectionApplication(_server, application); |
|
tasks.Add(gotifyApplicationBufferBlock.SendAsync(gotifyConnectionApplication, cancellationToken)); |
} |
|
await Task.WhenAll(tasks); |
gotifyApplicationBufferBlock.Complete(); |
await gotifyApplicationActionBlock.Completion; |
} |
|
private async Task HeartBeat(CancellationToken cancellationToken) |
{ |
try |
{ |
do |
{ |
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); |
|
_webSocketsClientPingStopWatch.Restart(); |
if (!_webSocketSharp.Ping()) |
{ |
Log.Warning($"Server {_server.Name} did not respond to PING message."); |
continue; |
} |
|
var imageBytes = await _httpClient.GetByteArrayAsync(application.Image); |
using (var memoryStream = new MemoryStream(imageBytes)) |
{ |
var image = Image.FromStream(memoryStream); |
var delta = _webSocketsClientPingStopWatch.ElapsedMilliseconds; |
|
if (GotifyNotification != null) |
Log.Information($"PING response latency for {_server.Name} is {delta}ms"); |
|
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken); |
|
} while (!cancellationToken.IsCancellationRequested); |
} |
catch (Exception exception) when (exception is OperationCanceledException || |
exception is ObjectDisposedException) |
{ |
GotifyNotification.Invoke(this, |
new GotifyNotificationEventArgs(gotifyNotification, image)); |
} |
catch (Exception exception) |
{ |
Log.Warning(exception, $"Heartbeat for server {_server.Name} has failed."); |
} |
} |
|
private async Task<GotifyApplication[]> RetrieveGotifyApplications() |
{ |
if (!Uri.TryCreate(_httpUri, "application", out var combinedUri)) |
{ |
Log.Error($"No application URL could be built for {_server.Url}."); |
|
break; |
return Array.Empty<GotifyApplication>(); |
} |
|
Debug.WriteLine($"{gotifyNotification.Message}"); |
} while (!cancellationToken.IsCancellationRequested); |
try |
{ |
using var messageStream = await _httpClient.GetStreamAsync(combinedUri); |
using var streamReader = new StreamReader(messageStream); |
using var jsonTextReader = new JsonTextReader(streamReader); |
|
await _webSocketClient.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, |
CancellationToken.None); |
return _jsonSerializer.Deserialize<GotifyApplication[]>(jsonTextReader); |
} |
catch (WebSocketException) |
catch (Exception exception) |
{ |
// Reconnect |
if (_webSocketClient != null) |
Log.Warning(exception,"Could not deserialize the list of applications from the server."); |
|
return Array.Empty<GotifyApplication>(); |
} |
} |
|
private async Task<Stream> RetrieveGotifyApplicationImage(int appId) |
{ |
_webSocketClient.Abort(); |
_webSocketClient.Dispose(); |
_webSocketClient = null; |
var memoryStream = new MemoryStream(); |
|
foreach (var application in await RetrieveGotifyApplications()) |
{ |
if (application.Id != appId) |
{ |
continue; |
} |
|
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); |
if (!Uri.TryCreate(Path.Combine($"{_httpUri}", $"{application.Image}"), UriKind.Absolute, out var applicationImageUri)) |
{ |
Log.Warning("Could not build URL path to application icon"); |
|
continue; |
} |
} while (!cancellationToken.IsCancellationRequested); |
} |
catch (Exception ex) when (ex is OperationCanceledException || ex is ObjectDisposedException) |
|
try |
{ |
using var imageResponse = await _httpClient.GetStreamAsync(applicationImageUri); |
|
await imageResponse.CopyToAsync(memoryStream); |
|
return memoryStream; |
} |
catch (Exception ex) |
catch (Exception exception) |
{ |
Debug.WriteLine($"Exception: {ex}"); |
Log.Error(exception,"Could not retrieve application image."); |
} |
} |
|
return memoryStream; |
} |
|
#endregion |
} |
} |