/Zzz/Clients/MqttClient.cs |
@@ -7,11 +7,7 @@ |
using System.Threading.Tasks; |
using MQTTnet; |
using MQTTnet.Client; |
using MQTTnet.Client.Connecting; |
using MQTTnet.Client.Disconnecting; |
using MQTTnet.Client.Options; |
using MQTTnet.Client.Receiving; |
using MQTTnet.Client.Subscribing; |
using MQTTnet.Packets; |
using Newtonsoft.Json; |
using Serilog; |
using Zzz.Action; |
@@ -60,8 +56,8 @@ |
|
#region Private Delegates, Events, Enums, Properties, Indexers and Fields |
|
private IMqttClientOptions _iMqttClient; |
private Configuration.Configuration _configuration; |
private MqttClientOptions _iMqttClient; |
private readonly Configuration.Configuration _configuration; |
|
#endregion |
|
@@ -140,7 +136,7 @@ |
|
using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload))) |
{ |
message.Payload = memoryStream.ToArray(); |
message.PayloadSegment = new ArraySegment<byte>(memoryStream.ToArray()); |
message.Topic = _configuration.MqttTopic; |
|
await _mqttClient.PublishAsync(message, _mqttCancellationToken); |
@@ -158,10 +154,9 @@ |
_mqttCancellationToken = _mqttCancellationTokenSource.Token; |
|
_mqttClient = new MqttFactory().CreateMqttClient(); |
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedHandler); |
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(MqttClientConnectedHandler); |
_mqttClient.ApplicationMessageReceivedHandler = |
new MqttApplicationMessageReceivedHandlerDelegate(MqttApplicationMessageReceivedHandler); |
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; |
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; |
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; |
|
_iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort, |
_configuration.MqttUsername, _configuration.MqttPassword); |
@@ -176,44 +171,8 @@ |
} |
} |
|
public async Task Stop() |
private async Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) |
{ |
if (_mqttCancellationTokenSource != null) |
{ |
_mqttCancellationTokenSource.Cancel(); |
|
if (_mqttClient != null && _mqttClient.IsConnected) |
{ |
await _mqttClient.DisconnectAsync(); |
|
MqttDisconnected?.Invoke(this, EventArgs.Empty); |
|
_mqttClient?.Dispose(); |
_mqttClient = null; |
} |
|
_mqttCancellationTokenSource = null; |
} |
} |
|
#endregion |
|
#region Private Methods |
|
public async Task Restart() |
{ |
if (!_configuration.MqttEnable) |
{ |
await Stop(); |
|
return; |
} |
|
await Stop().ContinueWith(async _ => await Start()); |
} |
|
private void MqttApplicationMessageReceivedHandler(MqttApplicationMessageReceivedEventArgs arg) |
{ |
// Do not bother with MQTT clients that do not set a client ID. |
if (string.IsNullOrEmpty(arg.ClientId)) |
{ |
@@ -220,7 +179,7 @@ |
return; |
} |
|
var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload); |
var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array ?? Array.Empty<byte>()); |
|
Log.Information($"Message from MQTT broker received: {payload}"); |
|
@@ -243,7 +202,7 @@ |
case "ACTION": |
try |
{ |
var message = (ZzzAction) JsonConvert.DeserializeObject(payload, typeof(ZzzAction)); |
var message = (ZzzAction)JsonConvert.DeserializeObject(payload, typeof(ZzzAction)); |
|
if (message == null) |
{ |
@@ -261,7 +220,7 @@ |
case "STATE": |
try |
{ |
var message = (ZzzState) JsonConvert.DeserializeObject(payload, typeof(ZzzState)); |
var message = (ZzzState)JsonConvert.DeserializeObject(payload, typeof(ZzzState)); |
|
if (message == null) |
{ |
@@ -279,7 +238,7 @@ |
} |
} |
|
private async Task MqttClientConnectedHandler(MqttClientConnectedEventArgs args) |
private async Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) |
{ |
if (!_configuration.MqttEnable) |
{ |
@@ -322,7 +281,7 @@ |
} |
} |
|
private async Task MqttClientDisconnectedHandler(MqttClientDisconnectedEventArgs args) |
private async Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) |
{ |
if (!_configuration.MqttEnable) |
{ |
@@ -351,10 +310,46 @@ |
} |
} |
|
private static IMqttClientOptions BuildMqttClient(string server, int port, string username, string password) |
public async Task Stop() |
{ |
if (_mqttCancellationTokenSource != null) |
{ |
_mqttCancellationTokenSource.Cancel(); |
|
if (_mqttClient != null && _mqttClient.IsConnected) |
{ |
await _mqttClient.DisconnectAsync(); |
|
MqttDisconnected?.Invoke(this, EventArgs.Empty); |
|
_mqttClient?.Dispose(); |
_mqttClient = null; |
} |
|
_mqttCancellationTokenSource = null; |
} |
} |
|
#endregion |
|
#region Private Methods |
|
public async Task Restart() |
{ |
if (!_configuration.MqttEnable) |
{ |
await Stop(); |
|
return; |
} |
|
await Stop().ContinueWith(async _ => await Start()); |
} |
|
private static MqttClientOptions BuildMqttClient(string server, int port, string username, string password) |
{ |
return new MqttClientOptionsBuilder() |
.WithCommunicationTimeout(TimeSpan.FromMinutes(1)) |
.WithTimeout(TimeSpan.FromMinutes(1)) |
.WithTcpServer(server, port) |
.WithCredentials(username, password) |
.Build(); |