Korero – Rev 1

Subversion Repositories:
Rev:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Korero.Properties;
using Korero.Serialization;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Subscribing;
using Serilog;

namespace Korero.Communication
{
    public class MqttCommunication : IDisposable
    {
        #region Static Fields and Constants

        private static CSV _notifications;

        #endregion

        #region Public Events & Delegates

        public event EventHandler<MqttNotificationEventArgs> NotificationReceived;

        public event EventHandler<MqttConnectedEventArgs> Connected;

        public event EventHandler<MqttDisconnectedEventArgs> Disconnected;

        #endregion

        #region Public Enums, Properties and Fields

        public bool IsConnected { get; private set; }

        #endregion

        #region Private Delegates, Events, Enums, Properties, Indexers and Fields

        private readonly ConcurrentDictionary<Guid, TaskCompletionSource<Callback>> _mqttCommandTaskCompletionSource;

        private readonly ScheduledContinuation _scheduledContinuation;

        private Guid _id;

        private IMqttClient _mqttClient;

        private IMqttClientOptions _mqttClientOptions;

        private MqttClientSubscribeOptions _mqttSubScribeOptions;

        private string _mqttTopic;

        #endregion

        #region Constructors, Destructors and Finalizers

        public MqttCommunication()
        {
            _notifications = new CSV(new[]
            {
                "group", "message", "map", "coarse", "parcel", "teleport", "statistics", "friendship", "balance",
                "heartbeat"
            });
            _scheduledContinuation = new ScheduledContinuation();
            _id = Guid.NewGuid();
            _mqttCommandTaskCompletionSource = new ConcurrentDictionary<Guid, TaskCompletionSource<Callback>>();
            Settings.Default.PropertyChanged += Default_PropertyChanged;
        }

        public void Dispose()
        {
            Settings.Default.PropertyChanged -= Default_PropertyChanged;

            if (_mqttClient == null)
            {
                return;
            }

            _mqttClient?.Dispose();
            _mqttClient = null;
        }

        #endregion

        #region Event Handlers

        private void Default_PropertyChanged(object sender, PropertyChangedEventArgs e)
        {
            if (e.PropertyName != Settings.Default.MqttHost &&
                e.PropertyName != Settings.Default.MqttPort &&
                e.PropertyName != Settings.Default.Group &&
                e.PropertyName != Settings.Default.Password)
            {
                return;
            }

            _scheduledContinuation.Schedule(1000, Reconnect);
        }

        #endregion

        #region Public Methods

        public async Task Stop()
        {
            if (_mqttClient == null)
            {
                return;
            }

            if (_mqttClient.IsConnected)
            {
                await _mqttClient.DisconnectAsync();
            }

            if (_mqttClient != null)
            {
                _mqttClient?.Dispose();
                _mqttClient = null;
            }
        }

        public async Task Start()
        {
            if (!int.TryParse(Settings.Default.MqttPort, out var mqttPort))
            {
                return;
            }

            _mqttClientOptions = new MqttClientOptionsBuilder()
                .WithCommunicationTimeout(TimeSpan.FromMinutes(1))
                .WithKeepAlivePeriod(TimeSpan.FromSeconds(1))
                .WithClientId(_id.ToString())
                .WithTcpServer(Settings.Default.MqttHost, mqttPort)
                .Build();

            _mqttClient = new MqttFactory().CreateMqttClient();
            _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(MqttConnectedHandler);
            _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttDisconnected);
            _mqttClient.ApplicationMessageReceivedHandler =
                new MqttApplicationMessageReceivedHandlerDelegate(MqttMessageReceived);

            await _mqttClient.ConnectAsync(_mqttClientOptions);
        }

        public async Task<Callback> SendCommand(Command command, CancellationToken cancellationToken)
        {
            // Do not send the command if MQTT is not connected.
            if (_mqttClient == null || !_mqttClient.IsConnected)
            {
                Log.Warning("Command could not be processed because the MQTT client was not connected.");

                return null;
            }

            var mqttApplicationMessage = new MqttApplicationMessageBuilder()
                .WithTopic(_mqttTopic)
                .WithPayload(command.Payload)
                .Build();

            var commandTaskCompletionSource = new TaskCompletionSource<Callback>();

            if (!_mqttCommandTaskCompletionSource.TryAdd(command.Id, commandTaskCompletionSource))
            {
                return null;
            }

            try
            {
                await _mqttClient.PublishAsync(mqttApplicationMessage, cancellationToken);

                #pragma warning disable 4014
                Task.Delay(Constants.CommandTimeout, cancellationToken)
                    .ContinueWith(task => commandTaskCompletionSource.TrySetResult(null), CancellationToken.None);
                #pragma warning restore 4014

                var callBack = await commandTaskCompletionSource.Task;

                return callBack;
            }
            catch (Exception ex)
            {
                Log.Warning(ex, "Unable to publish MQTT message.");

                return null;
            }
            finally
            {
                // Ensure that the task is removed.
                _mqttCommandTaskCompletionSource.TryRemove(command.Id, out _);
            }
        }

        public void Restart()
        {
            _scheduledContinuation.Schedule(250, Reconnect);
        }

        #endregion

        #region Private Methods

        private async void Reconnect()
        {
            await Stop().ContinueWith(async task => await Start());
        }

        private void MqttMessageReceived(MqttApplicationMessageReceivedEventArgs arg)
        {
            if (arg.ApplicationMessage == null)
            {
                return;
            }

            if (arg.ApplicationMessage.Topic != _mqttTopic)
            {
                return;
            }

            var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);

            var decode = KeyValue.Decode(payload);

            var corradeMessage = decode.ToDictionary(
                kvp => WebUtility.UrlDecode(kvp.Key),
                kvp => WebUtility.UrlDecode(kvp.Value));

            if (corradeMessage.ContainsKey("command"))
            {
                if (!corradeMessage.ContainsKey("success"))
                {
                    return;
                }

                // Ignore commands that do not have an afterburn id.
                if (!corradeMessage.ContainsKey("_id"))
                {
                    return;
                }

                var textId = corradeMessage["_id"];

                var id = new Guid(textId);

                if (_mqttCommandTaskCompletionSource.TryRemove(id, out var commandTaskCompletionSource))
                {
                    commandTaskCompletionSource.TrySetResult(new Callback(corradeMessage));
                }

                return;
            }

            if (corradeMessage.ContainsKey("notification"))
            {
                NotificationReceived?.Invoke(this, new MqttNotificationEventArgs(new Notification(corradeMessage)));
            }
        }

        private async Task MqttDisconnected(MqttClientDisconnectedEventArgs arg)
        {
            IsConnected = false;

            Disconnected?.Invoke(this, new MqttDisconnectedEventArgs(arg.Exception));

            await Task.Delay(TimeSpan.FromSeconds(1));

            await _mqttClient.ConnectAsync(_mqttClientOptions);
        }

        private async Task MqttConnectedHandler(MqttClientConnectedEventArgs arg)
        {
            switch (arg.ConnectResult.ResultCode)
            {
                case MqttClientConnectResultCode.Success:
                    _mqttTopic = $"{Settings.Default.Group}/{Settings.Default.Password}/{_notifications}";

                    _mqttSubScribeOptions = new MqttClientSubscribeOptionsBuilder()
                        .WithTopicFilter(new MqttTopicFilter {Topic = _mqttTopic})
                        .Build();

                    var mqttClientSubscribeResult = await _mqttClient.SubscribeAsync(_mqttSubScribeOptions);
                    if (mqttClientSubscribeResult.Items.All(result =>
                        result.ResultCode == MqttClientSubscribeResultCode.GrantedQoS0))
                    {
                        if (!string.IsNullOrEmpty(await GetVersion()))
                        {
                            IsConnected = true;

                            Connected?.Invoke(this, new MqttConnectedEventArgs());

                            await ProcessOfflineMessages();
                        }
                    }

                    break;
            }
        }

        private async Task ProcessOfflineMessages()
        {
            var data = new Dictionary<string, string>
            {
                {"command", "processofflinemessages"},
                {"group", Settings.Default.Group},
                {"password", Settings.Default.Password}
            };

            var callback = await SendCommand(new Command(data), CancellationToken.None);

            if (callback == null || !callback.Success)
            {
            }
        }

        private async Task<string> GetVersion()
        {
            var data = new Dictionary<string, string>
            {
                {"command", "version"},
                {"group", Settings.Default.Group},
                {"password", Settings.Default.Password}
            };

            var callback = await SendCommand(new Command(data), CancellationToken.None);

            if (callback == null || !callback.Success)
            {
                return string.Empty;
            }

            return callback.Data.FirstOrDefault();
        }

        #endregion
    }
}

Generated by GNU Enscript 1.6.5.90.