/trunk/Winify/Gotify/GotifyConnection.cs |
@@ -58,6 +58,9 @@ |
|
private Task _retrievePastMessagesTask; |
private static JsonSerializer _jsonSerializer; |
private readonly CancellationToken _programCancellationToken; |
private CancellationTokenSource _localCancellationTokenSource; |
private CancellationToken _localCancellationToken; |
|
#endregion |
|
@@ -118,10 +121,11 @@ |
new DataflowLinkOptions { PropagateCompletion = true }); |
} |
|
public GotifyConnection(Server server, Configuration.Configuration configuration) : this() |
public GotifyConnection(Server server, Configuration.Configuration configuration, CancellationToken cancellationToken) : this() |
{ |
_server = server; |
_configuration = configuration; |
_programCancellationToken = cancellationToken; |
|
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12; |
var httpClientHandler = new HttpClientHandler |
@@ -172,10 +176,10 @@ |
|
public void Dispose() |
{ |
if (_cancellationTokenSource != null) |
if (_localCancellationTokenSource != null) |
{ |
_cancellationTokenSource.Dispose(); |
_cancellationTokenSource = null; |
_localCancellationTokenSource.Dispose(); |
_localCancellationTokenSource = null; |
} |
|
if (_tplWebSocketsBufferBlockTransformLink != null) |
@@ -207,6 +211,32 @@ |
|
#region Public Methods |
|
public async Task Stop() |
{ |
_localCancellationTokenSource.Cancel(); |
|
if (_heartBeatTask != null) |
{ |
await _heartBeatTask; |
} |
|
if (_retrievePastMessagesTask != null) |
{ |
await _retrievePastMessagesTask; |
} |
|
if (_webSocketSharp != null) |
{ |
_webSocketSharp.OnMessage -= WebSocketSharp_OnMessage; |
_webSocketSharp.OnError -= WebSocketSharp_OnError; |
_webSocketSharp.OnOpen -= WebSocketSharp_OnOpen; |
_webSocketSharp.OnClose -= WebSocketSharp_OnClose; |
|
_webSocketSharp.Close(); |
_webSocketSharp = null; |
} |
} |
|
public void Start() |
{ |
if (_webSocketsUri == null || _httpUri == null) |
@@ -215,7 +245,11 @@ |
return; |
} |
|
_cancellationTokenSource = new CancellationTokenSource(); |
|
_localCancellationTokenSource = new CancellationTokenSource(); |
_localCancellationToken = _localCancellationTokenSource.Token; |
|
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(new [] { _programCancellationToken, _localCancellationToken }); |
_cancellationToken = _cancellationTokenSource.Token; |
|
if (_webSocketSharp != null) |
@@ -269,14 +303,20 @@ |
|
private async void WebSocketSharp_OnClose(object sender, CloseEventArgs e) |
{ |
Log.Information( |
$"WebSockets connection to server {_webSocketsUri.AbsoluteUri} closed with reason {e.Reason}"); |
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} closed with reason {e.Reason}"); |
|
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken); |
try |
{ |
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken); |
|
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
|
await Stop().ContinueWith(task => Start(), CancellationToken.None); |
await Stop().ContinueWith(task => Start(), _programCancellationToken); |
} |
catch (Exception exception) |
{ |
Log.Error(exception, "Error restarting WebSockets connection on connection closed."); |
} |
} |
|
private void WebSocketSharp_OnOpen(object sender, EventArgs e) |
@@ -283,18 +323,25 @@ |
{ |
Log.Information($"WebSockets connection to server {_webSocketsUri.AbsoluteUri} is now open"); |
|
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken); |
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken); |
} |
|
private async void OnUnresponsiveServer() |
private async void OnServerNotResponding() |
{ |
Log.Warning($"Server {_server.Name} has not responded in a long while..."); |
|
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken); |
try |
{ |
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken); |
|
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
|
await Stop().ContinueWith(task => Start(), CancellationToken.None); |
await Stop().ContinueWith(task => Start(), _programCancellationToken); |
} |
catch (Exception exception) |
{ |
Log.Error(exception, "Error restarting WebSockets connection on connection closed."); |
} |
} |
|
private async void WebSocketSharp_OnError(object sender, ErrorEventArgs e) |
@@ -303,11 +350,18 @@ |
$"Connection to WebSockets server {_webSocketsUri.AbsoluteUri} terminated unexpectedly with message {e.Message}", |
e.Exception); |
|
await Task.Delay(TimeSpan.FromSeconds(1), _cancellationToken); |
try |
{ |
await Task.Delay(TimeSpan.FromSeconds(1), _programCancellationToken); |
|
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
Log.Information($"Reconnecting to websocket server {_webSocketsUri.AbsoluteUri}"); |
|
await Stop().ContinueWith(task => Start(), CancellationToken.None); |
await Stop().ContinueWith(task => Start(), _programCancellationToken); |
} |
catch (Exception exception) |
{ |
Log.Error(exception, "Error restarting WebSockets connection on connection closed."); |
} |
} |
|
private async void WebSocketSharp_OnMessage(object sender, MessageEventArgs e) |
@@ -316,7 +370,7 @@ |
{ |
Log.Information($"Server {_server.Name} sent PING message"); |
|
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken); |
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken); |
return; |
} |
|
@@ -323,41 +377,6 @@ |
await _webSocketMessageBufferBlock.SendAsync(new GotifyConnectionData(e.RawData, _server), _cancellationToken); |
} |
|
public async Task Stop() |
{ |
|
if (_webSocketSharp == null || _cancellationTokenSource == null) |
{ |
return; |
} |
|
_cancellationTokenSource.Cancel(); |
|
if (_heartBeatTask != null) |
{ |
await _heartBeatTask; |
} |
|
if (_retrievePastMessagesTask != null) |
{ |
await _retrievePastMessagesTask; |
} |
|
if (_webSocketSharp != null) |
{ |
_webSocketSharp.OnMessage -= WebSocketSharp_OnMessage; |
_webSocketSharp.OnError -= WebSocketSharp_OnError; |
_webSocketSharp.OnOpen -= WebSocketSharp_OnOpen; |
_webSocketSharp.OnClose -= WebSocketSharp_OnClose; |
|
_webSocketSharp.Close(); |
_webSocketSharp = null; |
} |
|
_cancellationTokenSource.Dispose(); |
_cancellationTokenSource = null; |
} |
|
#endregion |
|
#region Private Methods |
@@ -461,7 +480,7 @@ |
|
Log.Information($"PING response latency for {_server.Name} is {delta}ms"); |
|
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnUnresponsiveServer, _cancellationToken); |
_webSocketsServerResponseScheduledContinuation.Schedule(TimeSpan.FromMinutes(1), OnServerNotResponding, _cancellationToken); |
|
} while (!cancellationToken.IsCancellationRequested); |
} |