Zzz – Rev 5

Subversion Repositories:
Rev:
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using Newtonsoft.Json;
using Serilog;
using Zzz.Action;
using Zzz.Properties;
using Zzz.State;

namespace Zzz.Clients
{
    public class MqttClient : IDisposable
    {
        #region Static Fields and Constants

        private static IMqttClient _mqttClient;

        private static CancellationTokenSource _mqttCancellationTokenSource;

        private static CancellationToken _mqttCancellationToken;

        #endregion

        #region Public Events & Delegates

        public event EventHandler<MqttClientSubscribeResultCode> MqttSubscribeSucceeded;

        public event EventHandler<MqttClientSubscribeResultCode> MqttSubscribeFailed;

        public event EventHandler MqttConnectionSucceeded;

        public event EventHandler MqttConnectionFailed;

        public event EventHandler MqttDisconnected;

        public event EventHandler<MqttStateReceivedEventArgs> MqttStateReceived;

        public event EventHandler<MqttActionReceivedEventArgs> MqttActionReceived;

        #endregion

        #region Public Enums, Properties and Fields

        public bool Connected { get; set; }

        public bool Subscribed { get; set; }

        #endregion

        #region Private Delegates, Events, Enums, Properties, Indexers and Fields

        private MqttClientOptions _iMqttClient;
        private readonly Configuration.Configuration _configuration;

        #endregion

        #region Constructors, Destructors and Finalizers

        public MqttClient(Configuration.Configuration configuration) : this()
        {
            _configuration = configuration;
        }

        private MqttClient()
        {
            MqttSubscribeSucceeded += MqttClient_MqttSubscribeSucceeded;
            MqttSubscribeFailed += MqttClient_MqttSubscribeFailed;
            MqttConnectionSucceeded += MqttClient_MqttConnectionSucceeded;
            MqttConnectionFailed += MqttClient_MqttConnectionFailed;
            MqttDisconnected += MqttClient_MqttDisconnected;
        }

        public void Dispose()
        {
            _mqttCancellationTokenSource?.Cancel();
            _mqttCancellationTokenSource = null;

            _mqttClient?.Dispose();
            _mqttClient = null;
        }

        #endregion

        #region Event Handlers

        private void MqttClient_MqttDisconnected(object sender, EventArgs e)
        {
            Connected = false;
            Subscribed = false;
        }

        private void MqttClient_MqttConnectionFailed(object sender, EventArgs e)
        {
            Connected = false;
            Subscribed = false;
        }

        private void MqttClient_MqttConnectionSucceeded(object sender, EventArgs e)
        {
            Connected = true;
        }

        private void MqttClient_MqttSubscribeFailed(object sender, MqttClientSubscribeResultCode e)
        {
            Subscribed = false;
        }

        private void MqttClient_MqttSubscribeSucceeded(object sender, MqttClientSubscribeResultCode e)
        {
            Subscribed = true;
        }

        #endregion

        #region Public Methods

        public async Task Publish(Action.Action action)
        {
            try
            {
                if (_mqttClient == null || !_mqttClient.IsConnected)
                {
                    return;
                }

                var message = new MqttApplicationMessage();

                var payload = JsonConvert.SerializeObject(new ZzzAction(action));

                using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload)))
                {
                    message.PayloadSegment = new ArraySegment<byte>(memoryStream.ToArray());
                    message.Topic = _configuration.MqttTopic;

                    await _mqttClient.PublishAsync(message, _mqttCancellationToken);
                }
            }
            catch (Exception ex)
            {
                Log.Warning(ex, "Unable to publish sleep event to MQTT broker.");
            }
        }

        public async Task Start()
        {
            _mqttCancellationTokenSource = new CancellationTokenSource();
            _mqttCancellationToken = _mqttCancellationTokenSource.Token;

            _mqttClient = new MqttFactory().CreateMqttClient();
            _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
            _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
            _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;

            _iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort,
                _configuration.MqttUsername, _configuration.MqttPassword);

            try
            {
                await _mqttClient.ConnectAsync(_iMqttClient, _mqttCancellationToken);
            } 
            catch(Exception exception)
            {
                Log.Information(exception, "MQTT connection has been cancelled.");
            }
        }

        private async Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            // Do not bother with MQTT clients that do not set a client ID.
            if (string.IsNullOrEmpty(arg.ClientId))
            {
                return;
            }

            var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array ?? Array.Empty<byte>());

            Log.Information($"Message from MQTT broker received: {payload}");

            // Only listen on configured MQTT topic.
            var topic = arg.ApplicationMessage.Topic.Split('/');

            var messageTopic = new string[topic.Length - 1];
            Array.Copy(topic, messageTopic, topic.Length - 1);

            // Only listen on configured topic.
            if (string.Join("/", messageTopic) != _configuration.MqttTopic)
            {
                return;
            }

            var zzzTrigger = topic[topic.Length - 1].ToUpperInvariant();

            switch (zzzTrigger)
            {
                case "ACTION":
                    try
                    {
                        var message = (ZzzAction)JsonConvert.DeserializeObject(payload, typeof(ZzzAction));

                        if (message == null)
                        {
                            throw new ArgumentException("Invalid action received from MQTT broker.");
                        }

                        MqttActionReceived?.Invoke(this, new MqttActionReceivedEventArgs(message));
                    }
                    catch (Exception ex)
                    {
                        Log.Warning(ex, "Unable to decode action change MQTT request.");
                    }

                    break;
                case "STATE":
                    try
                    {
                        var message = (ZzzState)JsonConvert.DeserializeObject(payload, typeof(ZzzState));

                        if (message == null)
                        {
                            throw new ArgumentException("Invalid state received from MQTT broker.");
                        }

                        MqttStateReceived?.Invoke(this, new MqttStateReceivedEventArgs(message));
                    }
                    catch (Exception ex)
                    {
                        Log.Warning(ex, "Unable to decode state change MQTT request.");
                    }

                    break;
            }
        }

        private async Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            if (!_configuration.MqttEnable)
            {
                return;
            }

            MqttConnectionSucceeded?.Invoke(this, EventArgs.Empty);

            try
            {
                var subscribe = await _mqttClient.SubscribeAsync(
                    new MqttClientSubscribeOptions
                    {
                        TopicFilters = new List<MqttTopicFilter>
                        {
                            new MqttTopicFilter {Topic = $"{_configuration.MqttTopic}/Action"},
                            new MqttTopicFilter {Topic = $"{_configuration.MqttTopic}/State"}
                        }
                    }, _mqttCancellationToken);

                foreach (var sub in subscribe.Items)
                {
                    switch (sub.ResultCode)
                    {
                        case MqttClientSubscribeResultCode.GrantedQoS0:
                        case MqttClientSubscribeResultCode.GrantedQoS1:
                        case MqttClientSubscribeResultCode.GrantedQoS2:
                            MqttSubscribeSucceeded?.Invoke(this, sub.ResultCode);
                            break;

                        default:
                            MqttSubscribeFailed?.Invoke(this, sub.ResultCode);
                            break;
                    }
                }
            }
            catch
            {
                MqttConnectionFailed?.Invoke(this, EventArgs.Empty);
            }
        }

        private async Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            if (!_configuration.MqttEnable)
            {
                return;
            }

            MqttDisconnected?.Invoke(this, EventArgs.Empty);

            try
            {
                await Task.Delay(TimeSpan.FromSeconds(1), _mqttCancellationToken);

                if (_mqttClient.IsConnected)
                {
                    return;
                }

                await _mqttClient.ConnectAsync(
                    BuildMqttClient(_configuration.MqttServer, (int)_configuration.MqttPort,
                        _configuration.MqttUsername,
                        _configuration.MqttPassword), _mqttCancellationToken);
            }
            catch (Exception ex)
            {
                Log.Warning(ex, "Disconnected from MQTT server.");
            }
        }

        public async Task Stop()
        {
            if (_mqttCancellationTokenSource != null)
            {
                _mqttCancellationTokenSource.Cancel();

                if (_mqttClient != null && _mqttClient.IsConnected)
                {
                    await _mqttClient.DisconnectAsync();

                    MqttDisconnected?.Invoke(this, EventArgs.Empty);

                    _mqttClient?.Dispose();
                    _mqttClient = null;
                }

                _mqttCancellationTokenSource = null;
            }
        }

        #endregion

        #region Private Methods

        public async Task Restart()
        {
            if (!_configuration.MqttEnable)
            {
                await Stop();

                return;
            }

            await Stop().ContinueWith(async _ => await Start());
        }

        private static MqttClientOptions BuildMqttClient(string server, int port, string username, string password)
        {
            return new MqttClientOptionsBuilder()
                .WithTimeout(TimeSpan.FromMinutes(1))
                .WithTcpServer(server, port)
                .WithCredentials(username, password)
                .Build();
        }

        #endregion
    }
}

Generated by GNU Enscript 1.6.5.90.