Zzz – Rev 5
?pathlinks?
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using Newtonsoft.Json;
using Serilog;
using Zzz.Action;
using Zzz.Properties;
using Zzz.State;
namespace Zzz.Clients
{
public class MqttClient : IDisposable
{
#region Static Fields and Constants
private static IMqttClient _mqttClient;
private static CancellationTokenSource _mqttCancellationTokenSource;
private static CancellationToken _mqttCancellationToken;
#endregion
#region Public Events & Delegates
public event EventHandler<MqttClientSubscribeResultCode> MqttSubscribeSucceeded;
public event EventHandler<MqttClientSubscribeResultCode> MqttSubscribeFailed;
public event EventHandler MqttConnectionSucceeded;
public event EventHandler MqttConnectionFailed;
public event EventHandler MqttDisconnected;
public event EventHandler<MqttStateReceivedEventArgs> MqttStateReceived;
public event EventHandler<MqttActionReceivedEventArgs> MqttActionReceived;
#endregion
#region Public Enums, Properties and Fields
public bool Connected { get; set; }
public bool Subscribed { get; set; }
#endregion
#region Private Delegates, Events, Enums, Properties, Indexers and Fields
private MqttClientOptions _iMqttClient;
private readonly Configuration.Configuration _configuration;
#endregion
#region Constructors, Destructors and Finalizers
public MqttClient(Configuration.Configuration configuration) : this()
{
_configuration = configuration;
}
private MqttClient()
{
MqttSubscribeSucceeded += MqttClient_MqttSubscribeSucceeded;
MqttSubscribeFailed += MqttClient_MqttSubscribeFailed;
MqttConnectionSucceeded += MqttClient_MqttConnectionSucceeded;
MqttConnectionFailed += MqttClient_MqttConnectionFailed;
MqttDisconnected += MqttClient_MqttDisconnected;
}
public void Dispose()
{
_mqttCancellationTokenSource?.Cancel();
_mqttCancellationTokenSource = null;
_mqttClient?.Dispose();
_mqttClient = null;
}
#endregion
#region Event Handlers
private void MqttClient_MqttDisconnected(object sender, EventArgs e)
{
Connected = false;
Subscribed = false;
}
private void MqttClient_MqttConnectionFailed(object sender, EventArgs e)
{
Connected = false;
Subscribed = false;
}
private void MqttClient_MqttConnectionSucceeded(object sender, EventArgs e)
{
Connected = true;
}
private void MqttClient_MqttSubscribeFailed(object sender, MqttClientSubscribeResultCode e)
{
Subscribed = false;
}
private void MqttClient_MqttSubscribeSucceeded(object sender, MqttClientSubscribeResultCode e)
{
Subscribed = true;
}
#endregion
#region Public Methods
public async Task Publish(Action.Action action)
{
try
{
if (_mqttClient == null || !_mqttClient.IsConnected)
{
return;
}
var message = new MqttApplicationMessage();
var payload = JsonConvert.SerializeObject(new ZzzAction(action));
using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload)))
{
message.PayloadSegment = new ArraySegment<byte>(memoryStream.ToArray());
message.Topic = _configuration.MqttTopic;
await _mqttClient.PublishAsync(message, _mqttCancellationToken);
}
}
catch (Exception ex)
{
Log.Warning(ex, "Unable to publish sleep event to MQTT broker.");
}
}
public async Task Start()
{
_mqttCancellationTokenSource = new CancellationTokenSource();
_mqttCancellationToken = _mqttCancellationTokenSource.Token;
_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;
_iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort,
_configuration.MqttUsername, _configuration.MqttPassword);
try
{
await _mqttClient.ConnectAsync(_iMqttClient, _mqttCancellationToken);
}
catch(Exception exception)
{
Log.Information(exception, "MQTT connection has been cancelled.");
}
}
private async Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
// Do not bother with MQTT clients that do not set a client ID.
if (string.IsNullOrEmpty(arg.ClientId))
{
return;
}
var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array ?? Array.Empty<byte>());
Log.Information($"Message from MQTT broker received: {payload}");
// Only listen on configured MQTT topic.
var topic = arg.ApplicationMessage.Topic.Split('/');
var messageTopic = new string[topic.Length - 1];
Array.Copy(topic, messageTopic, topic.Length - 1);
// Only listen on configured topic.
if (string.Join("/", messageTopic) != _configuration.MqttTopic)
{
return;
}
var zzzTrigger = topic[topic.Length - 1].ToUpperInvariant();
switch (zzzTrigger)
{
case "ACTION":
try
{
var message = (ZzzAction)JsonConvert.DeserializeObject(payload, typeof(ZzzAction));
if (message == null)
{
throw new ArgumentException("Invalid action received from MQTT broker.");
}
MqttActionReceived?.Invoke(this, new MqttActionReceivedEventArgs(message));
}
catch (Exception ex)
{
Log.Warning(ex, "Unable to decode action change MQTT request.");
}
break;
case "STATE":
try
{
var message = (ZzzState)JsonConvert.DeserializeObject(payload, typeof(ZzzState));
if (message == null)
{
throw new ArgumentException("Invalid state received from MQTT broker.");
}
MqttStateReceived?.Invoke(this, new MqttStateReceivedEventArgs(message));
}
catch (Exception ex)
{
Log.Warning(ex, "Unable to decode state change MQTT request.");
}
break;
}
}
private async Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
if (!_configuration.MqttEnable)
{
return;
}
MqttConnectionSucceeded?.Invoke(this, EventArgs.Empty);
try
{
var subscribe = await _mqttClient.SubscribeAsync(
new MqttClientSubscribeOptions
{
TopicFilters = new List<MqttTopicFilter>
{
new MqttTopicFilter {Topic = $"{_configuration.MqttTopic}/Action"},
new MqttTopicFilter {Topic = $"{_configuration.MqttTopic}/State"}
}
}, _mqttCancellationToken);
foreach (var sub in subscribe.Items)
{
switch (sub.ResultCode)
{
case MqttClientSubscribeResultCode.GrantedQoS0:
case MqttClientSubscribeResultCode.GrantedQoS1:
case MqttClientSubscribeResultCode.GrantedQoS2:
MqttSubscribeSucceeded?.Invoke(this, sub.ResultCode);
break;
default:
MqttSubscribeFailed?.Invoke(this, sub.ResultCode);
break;
}
}
}
catch
{
MqttConnectionFailed?.Invoke(this, EventArgs.Empty);
}
}
private async Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
if (!_configuration.MqttEnable)
{
return;
}
MqttDisconnected?.Invoke(this, EventArgs.Empty);
try
{
await Task.Delay(TimeSpan.FromSeconds(1), _mqttCancellationToken);
if (_mqttClient.IsConnected)
{
return;
}
await _mqttClient.ConnectAsync(
BuildMqttClient(_configuration.MqttServer, (int)_configuration.MqttPort,
_configuration.MqttUsername,
_configuration.MqttPassword), _mqttCancellationToken);
}
catch (Exception ex)
{
Log.Warning(ex, "Disconnected from MQTT server.");
}
}
public async Task Stop()
{
if (_mqttCancellationTokenSource != null)
{
_mqttCancellationTokenSource.Cancel();
if (_mqttClient != null && _mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
MqttDisconnected?.Invoke(this, EventArgs.Empty);
_mqttClient?.Dispose();
_mqttClient = null;
}
_mqttCancellationTokenSource = null;
}
}
#endregion
#region Private Methods
public async Task Restart()
{
if (!_configuration.MqttEnable)
{
await Stop();
return;
}
await Stop().ContinueWith(async _ => await Start());
}
private static MqttClientOptions BuildMqttClient(string server, int port, string username, string password)
{
return new MqttClientOptionsBuilder()
.WithTimeout(TimeSpan.FromMinutes(1))
.WithTcpServer(server, port)
.WithCredentials(username, password)
.Build();
}
#endregion
}
}
Generated by GNU Enscript 1.6.5.90.