Zzz

Subversion Repositories:
Compare Path: Rev
With Path: Rev
?path1? @ 1  →  ?path2? @ 5
/Zzz/Clients/MqttClient.cs
@@ -7,11 +7,7 @@
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Subscribing;
using MQTTnet.Packets;
using Newtonsoft.Json;
using Serilog;
using Zzz.Action;
@@ -60,8 +56,8 @@
 
#region Private Delegates, Events, Enums, Properties, Indexers and Fields
 
private IMqttClientOptions _iMqttClient;
private Configuration.Configuration _configuration;
private MqttClientOptions _iMqttClient;
private readonly Configuration.Configuration _configuration;
 
#endregion
 
@@ -140,7 +136,7 @@
 
using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload)))
{
message.Payload = memoryStream.ToArray();
message.PayloadSegment = new ArraySegment<byte>(memoryStream.ToArray());
message.Topic = _configuration.MqttTopic;
 
await _mqttClient.PublishAsync(message, _mqttCancellationToken);
@@ -158,10 +154,9 @@
_mqttCancellationToken = _mqttCancellationTokenSource.Token;
 
_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedHandler);
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(MqttClientConnectedHandler);
_mqttClient.ApplicationMessageReceivedHandler =
new MqttApplicationMessageReceivedHandlerDelegate(MqttApplicationMessageReceivedHandler);
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;
 
_iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort,
_configuration.MqttUsername, _configuration.MqttPassword);
@@ -176,44 +171,8 @@
}
}
 
public async Task Stop()
private async Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
if (_mqttCancellationTokenSource != null)
{
_mqttCancellationTokenSource.Cancel();
 
if (_mqttClient != null && _mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
 
MqttDisconnected?.Invoke(this, EventArgs.Empty);
 
_mqttClient?.Dispose();
_mqttClient = null;
}
 
_mqttCancellationTokenSource = null;
}
}
 
#endregion
 
#region Private Methods
 
public async Task Restart()
{
if (!_configuration.MqttEnable)
{
await Stop();
 
return;
}
 
await Stop().ContinueWith(async _ => await Start());
}
 
private void MqttApplicationMessageReceivedHandler(MqttApplicationMessageReceivedEventArgs arg)
{
// Do not bother with MQTT clients that do not set a client ID.
if (string.IsNullOrEmpty(arg.ClientId))
{
@@ -220,7 +179,7 @@
return;
}
 
var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array ?? Array.Empty<byte>());
 
Log.Information($"Message from MQTT broker received: {payload}");
 
@@ -243,7 +202,7 @@
case "ACTION":
try
{
var message = (ZzzAction) JsonConvert.DeserializeObject(payload, typeof(ZzzAction));
var message = (ZzzAction)JsonConvert.DeserializeObject(payload, typeof(ZzzAction));
 
if (message == null)
{
@@ -261,7 +220,7 @@
case "STATE":
try
{
var message = (ZzzState) JsonConvert.DeserializeObject(payload, typeof(ZzzState));
var message = (ZzzState)JsonConvert.DeserializeObject(payload, typeof(ZzzState));
 
if (message == null)
{
@@ -279,7 +238,7 @@
}
}
 
private async Task MqttClientConnectedHandler(MqttClientConnectedEventArgs args)
private async Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
if (!_configuration.MqttEnable)
{
@@ -322,7 +281,7 @@
}
}
 
private async Task MqttClientDisconnectedHandler(MqttClientDisconnectedEventArgs args)
private async Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
if (!_configuration.MqttEnable)
{
@@ -351,10 +310,46 @@
}
}
 
private static IMqttClientOptions BuildMqttClient(string server, int port, string username, string password)
public async Task Stop()
{
if (_mqttCancellationTokenSource != null)
{
_mqttCancellationTokenSource.Cancel();
 
if (_mqttClient != null && _mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
 
MqttDisconnected?.Invoke(this, EventArgs.Empty);
 
_mqttClient?.Dispose();
_mqttClient = null;
}
 
_mqttCancellationTokenSource = null;
}
}
 
#endregion
 
#region Private Methods
 
public async Task Restart()
{
if (!_configuration.MqttEnable)
{
await Stop();
 
return;
}
 
await Stop().ContinueWith(async _ => await Start());
}
 
private static MqttClientOptions BuildMqttClient(string server, int port, string username, string password)
{
return new MqttClientOptionsBuilder()
.WithCommunicationTimeout(TimeSpan.FromMinutes(1))
.WithTimeout(TimeSpan.FromMinutes(1))
.WithTcpServer(server, port)
.WithCredentials(username, password)
.Build();