Korero – Rev 1
?pathlinks?
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Korero.Properties;
using Korero.Serialization;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Subscribing;
using Serilog;
namespace Korero.Communication
{
public class MqttCommunication : IDisposable
{
#region Static Fields and Constants
private static CSV _notifications;
#endregion
#region Public Events & Delegates
public event EventHandler<MqttNotificationEventArgs> NotificationReceived;
public event EventHandler<MqttConnectedEventArgs> Connected;
public event EventHandler<MqttDisconnectedEventArgs> Disconnected;
#endregion
#region Public Enums, Properties and Fields
public bool IsConnected { get; private set; }
#endregion
#region Private Delegates, Events, Enums, Properties, Indexers and Fields
private readonly ConcurrentDictionary<Guid, TaskCompletionSource<Callback>> _mqttCommandTaskCompletionSource;
private readonly ScheduledContinuation _scheduledContinuation;
private Guid _id;
private IMqttClient _mqttClient;
private IMqttClientOptions _mqttClientOptions;
private MqttClientSubscribeOptions _mqttSubScribeOptions;
private string _mqttTopic;
#endregion
#region Constructors, Destructors and Finalizers
public MqttCommunication()
{
_notifications = new CSV(new[]
{
"group", "message", "map", "coarse", "parcel", "teleport", "statistics", "friendship", "balance",
"heartbeat"
});
_scheduledContinuation = new ScheduledContinuation();
_id = Guid.NewGuid();
_mqttCommandTaskCompletionSource = new ConcurrentDictionary<Guid, TaskCompletionSource<Callback>>();
Settings.Default.PropertyChanged += Default_PropertyChanged;
}
public void Dispose()
{
Settings.Default.PropertyChanged -= Default_PropertyChanged;
if (_mqttClient == null)
{
return;
}
_mqttClient?.Dispose();
_mqttClient = null;
}
#endregion
#region Event Handlers
private void Default_PropertyChanged(object sender, PropertyChangedEventArgs e)
{
if (e.PropertyName != Settings.Default.MqttHost &&
e.PropertyName != Settings.Default.MqttPort &&
e.PropertyName != Settings.Default.Group &&
e.PropertyName != Settings.Default.Password)
{
return;
}
_scheduledContinuation.Schedule(1000, Reconnect);
}
#endregion
#region Public Methods
public async Task Stop()
{
if (_mqttClient == null)
{
return;
}
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
}
if (_mqttClient != null)
{
_mqttClient?.Dispose();
_mqttClient = null;
}
}
public async Task Start()
{
if (!int.TryParse(Settings.Default.MqttPort, out var mqttPort))
{
return;
}
_mqttClientOptions = new MqttClientOptionsBuilder()
.WithCommunicationTimeout(TimeSpan.FromMinutes(1))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(1))
.WithClientId(_id.ToString())
.WithTcpServer(Settings.Default.MqttHost, mqttPort)
.Build();
_mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(MqttConnectedHandler);
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttDisconnected);
_mqttClient.ApplicationMessageReceivedHandler =
new MqttApplicationMessageReceivedHandlerDelegate(MqttMessageReceived);
await _mqttClient.ConnectAsync(_mqttClientOptions);
}
public async Task<Callback> SendCommand(Command command, CancellationToken cancellationToken)
{
// Do not send the command if MQTT is not connected.
if (_mqttClient == null || !_mqttClient.IsConnected)
{
Log.Warning("Command could not be processed because the MQTT client was not connected.");
return null;
}
var mqttApplicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(_mqttTopic)
.WithPayload(command.Payload)
.Build();
var commandTaskCompletionSource = new TaskCompletionSource<Callback>();
if (!_mqttCommandTaskCompletionSource.TryAdd(command.Id, commandTaskCompletionSource))
{
return null;
}
try
{
await _mqttClient.PublishAsync(mqttApplicationMessage, cancellationToken);
#pragma warning disable 4014
Task.Delay(Constants.CommandTimeout, cancellationToken)
.ContinueWith(task => commandTaskCompletionSource.TrySetResult(null), CancellationToken.None);
#pragma warning restore 4014
var callBack = await commandTaskCompletionSource.Task;
return callBack;
}
catch (Exception ex)
{
Log.Warning(ex, "Unable to publish MQTT message.");
return null;
}
finally
{
// Ensure that the task is removed.
_mqttCommandTaskCompletionSource.TryRemove(command.Id, out _);
}
}
public void Restart()
{
_scheduledContinuation.Schedule(250, Reconnect);
}
#endregion
#region Private Methods
private async void Reconnect()
{
await Stop().ContinueWith(async task => await Start());
}
private void MqttMessageReceived(MqttApplicationMessageReceivedEventArgs arg)
{
if (arg.ApplicationMessage == null)
{
return;
}
if (arg.ApplicationMessage.Topic != _mqttTopic)
{
return;
}
var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
var decode = KeyValue.Decode(payload);
var corradeMessage = decode.ToDictionary(
kvp => WebUtility.UrlDecode(kvp.Key),
kvp => WebUtility.UrlDecode(kvp.Value));
if (corradeMessage.ContainsKey("command"))
{
if (!corradeMessage.ContainsKey("success"))
{
return;
}
// Ignore commands that do not have an afterburn id.
if (!corradeMessage.ContainsKey("_id"))
{
return;
}
var textId = corradeMessage["_id"];
var id = new Guid(textId);
if (_mqttCommandTaskCompletionSource.TryRemove(id, out var commandTaskCompletionSource))
{
commandTaskCompletionSource.TrySetResult(new Callback(corradeMessage));
}
return;
}
if (corradeMessage.ContainsKey("notification"))
{
NotificationReceived?.Invoke(this, new MqttNotificationEventArgs(new Notification(corradeMessage)));
}
}
private async Task MqttDisconnected(MqttClientDisconnectedEventArgs arg)
{
IsConnected = false;
Disconnected?.Invoke(this, new MqttDisconnectedEventArgs(arg.Exception));
await Task.Delay(TimeSpan.FromSeconds(1));
await _mqttClient.ConnectAsync(_mqttClientOptions);
}
private async Task MqttConnectedHandler(MqttClientConnectedEventArgs arg)
{
switch (arg.ConnectResult.ResultCode)
{
case MqttClientConnectResultCode.Success:
_mqttTopic = $"{Settings.Default.Group}/{Settings.Default.Password}/{_notifications}";
_mqttSubScribeOptions = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(new MqttTopicFilter {Topic = _mqttTopic})
.Build();
var mqttClientSubscribeResult = await _mqttClient.SubscribeAsync(_mqttSubScribeOptions);
if (mqttClientSubscribeResult.Items.All(result =>
result.ResultCode == MqttClientSubscribeResultCode.GrantedQoS0))
{
if (!string.IsNullOrEmpty(await GetVersion()))
{
IsConnected = true;
Connected?.Invoke(this, new MqttConnectedEventArgs());
await ProcessOfflineMessages();
}
}
break;
}
}
private async Task ProcessOfflineMessages()
{
var data = new Dictionary<string, string>
{
{"command", "processofflinemessages"},
{"group", Settings.Default.Group},
{"password", Settings.Default.Password}
};
var callback = await SendCommand(new Command(data), CancellationToken.None);
if (callback == null || !callback.Success)
{
}
}
private async Task<string> GetVersion()
{
var data = new Dictionary<string, string>
{
{"command", "version"},
{"group", Settings.Default.Group},
{"password", Settings.Default.Password}
};
var callback = await SendCommand(new Command(data), CancellationToken.None);
if (callback == null || !callback.Success)
{
return string.Empty;
}
return callback.Data.FirstOrDefault();
}
#endregion
}
}
Generated by GNU Enscript 1.6.5.90.