Winify

Subversion Repositories:
Compare Path: Rev
With Path: Rev
?path1? @ 82  →  ?path2? @ 83
/trunk/Winify/Gotify/GotifyConnection.cs
@@ -58,6 +58,9 @@
private Task _retrievePastMessagesTask;
private static JsonSerializer _jsonSerializer;
private readonly CancellationToken _programCancellationToken;
private CancellationTokenSource _localCancellationTokenSource;
private CancellationToken _localCancellationToken;
 
#endregion
 
@@ -118,10 +121,11 @@
new DataflowLinkOptions { PropagateCompletion = true });
}
 
public GotifyConnection(Server server, Configuration.Configuration configuration) : this()
public GotifyConnection(Server server, Configuration.Configuration configuration, CancellationToken cancellationToken) : this()
{
_server = server;
_configuration = configuration;
_programCancellationToken = cancellationToken;
 
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
var httpClientHandler = new HttpClientHandler
@@ -172,10 +176,10 @@
 
public void Dispose()
{
if (_cancellationTokenSource != null)
if (_localCancellationTokenSource != null)
{
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;
_localCancellationTokenSource.Dispose();
_localCancellationTokenSource = null;
}
 
if (_tplWebSocketsBufferBlockTransformLink != null)
@@ -207,6 +211,32 @@
 
#region Public Methods
 
public async Task Stop()
{
_localCancellationTokenSource.Cancel();
 
if (_heartBeatTask != null)
{
await _heartBeatTask;
}
 
if (_retrievePastMessagesTask != null)
{
await _retrievePastMessagesTask;
}
 
if (_webSocketSharp != null)
{
_webSocketSharp.OnMessage -= WebSocketSharp_OnMessage;
_webSocketSharp.OnError -= WebSocketSharp_OnError;
_webSocketSharp.OnOpen -= WebSocketSharp_OnOpen;
_webSocketSharp.OnClose -= WebSocketSharp_OnClose;
 
_webSocketSharp.Close();
_webSocketSharp = null;
}
}
 
public void Start()
{
if (_webSocketsUri == null || _httpUri == null)
@@ -215,7 +245,11 @@
return;
}
 
_cancellationTokenSource = new CancellationTokenSource();
 
_localCancellationTokenSource = new CancellationTokenSource();
_localCancellationToken = _localCancellationTokenSource.Token;
 
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(new [] { _programCancellationToken, _localCancellationToken });
_cancellationToken = _cancellationTokenSource.Token;
 
if (_webSocketSharp != null)
@@ -269,14 +303,20 @@
 
private async void WebSocketSharp_OnClose(object sender, CloseEventArgs e)
{
Log.Information(
$"WebSockets connection to server {_webSocketsUri.AbsoluteUri} closed with reason {e.Reason}");
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} closed with reason {e.Reason}");
 
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken);
try
{
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken);
 
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
 
await Stop().ContinueWith(task => Start(), CancellationToken.None);
await Stop().ContinueWith(task => Start(), _programCancellationToken);
}
catch (Exception exception)
{
Log.Error(exception, "Error restarting WebSockets connection on connection closed.");
}
}
 
private void WebSocketSharp_OnOpen(object sender, EventArgs e)
@@ -283,18 +323,25 @@
{
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} is now open");
 
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken);
}
 
private async void OnUnresponsiveServer()
private async void OnServerNotResponding()
{
Log.Warning($"Server {_server.Name} has not responded in a long while...");
 
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken);
try
{
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken);
 
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
 
await Stop().ContinueWith(task => Start(), CancellationToken.None);
await Stop().ContinueWith(task => Start(), _programCancellationToken);
}
catch (Exception exception)
{
Log.Error(exception, "Error restarting WebSockets connection on connection closed.");
}
}
 
private async void WebSocketSharp_OnError(object sender, ErrorEventArgs e)
@@ -303,11 +350,18 @@
$"Connection to WebSockets server {_webSocketsUri.AbsoluteUri} terminated unexpectedly with message {e.Message}",
e.Exception);
 
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken);
try
{
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken);
 
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}");
 
await Stop().ContinueWith(task => Start(), CancellationToken.None);
await Stop().ContinueWith(task => Start(), _programCancellationToken);
}
catch (Exception exception)
{
Log.Error(exception, "Error restarting WebSockets connection on connection closed.");
}
}
 
private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e)
@@ -316,7 +370,7 @@
{
Log.Information($"Server {_server.Name} sent PING message");
 
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken);
return;
}
 
@@ -323,41 +377,6 @@
await _webSocketMessageBufferBlock.SendAsync(new GotifyConnectionData(e.RawData, _server), _cancellationToken);
}
 
public async Task Stop()
{
 
if (_webSocketSharp == null || _cancellationTokenSource == null)
{
return;
}
 
_cancellationTokenSource.Cancel();
 
if (_heartBeatTask != null)
{
await _heartBeatTask;
}
 
if (_retrievePastMessagesTask != null)
{
await _retrievePastMessagesTask;
}
 
if (_webSocketSharp != null)
{
_webSocketSharp.OnMessage -= WebSocketSharp_OnMessage;
_webSocketSharp.OnError -= WebSocketSharp_OnError;
_webSocketSharp.OnOpen -= WebSocketSharp_OnOpen;
_webSocketSharp.OnClose -= WebSocketSharp_OnClose;
 
_webSocketSharp.Close();
_webSocketSharp = null;
}
 
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;
}
 
#endregion
 
#region Private Methods
@@ -461,7 +480,7 @@
 
Log.Information($"PING response latency for {_server.Name} is {delta}ms");
 
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken);
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken);
 
} while (!cancellationToken.IsCancellationRequested);
}