WingMan – Rev
?pathlinks?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
using MQTTnet.Server;
using WingMan.Utilities;
using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
namespace WingMan.Communication
{
public class MQTTCommunication : IDisposable
{
public delegate void ClientAuthenticationFailed(object sender, EventArgs e);
public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);
public delegate void ServerAuthenticationFailed(object sender, EventArgs e);
public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
public delegate void ServerStarted(object sender, EventArgs e);
public delegate void ServerStopped(object sender, EventArgs e);
public MQTTCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken)
{
TaskScheduler = taskScheduler;
CancellationToken = cancellationToken;
TrackedClients = new TrackedClients {Clients = new List<TrackedClient>()};
Client = new MqttFactory().CreateManagedMqttClient();
Server = new MqttFactory().CreateMqttServer();
}
private TrackedClients TrackedClients { get; }
private TaskScheduler TaskScheduler { get; }
private IManagedMqttClient Client { get; }
private IMqttServer Server { get; }
public bool Running { get; set; }
public string Nick { get; set; }
private IPAddress IPAddress { get; set; }
private int Port { get; set; }
private string Password { get; set; }
private CancellationToken CancellationToken { get; }
public MQTTCommunicationType Type { get; set; }
public async void Dispose()
{
await Stop();
}
public event ClientAuthenticationFailed OnClientAuthenticationFailed;
public event ServerAuthenticationFailed OnServerAuthenticationFailed;
public event MessageReceived OnMessageReceived;
public event ClientConnected OnClientConnected;
public event ClientDisconnected OnClientDisconnected;
public event ClientConnectionFailed OnClientConnectionFailed;
public event ClientUnsubscribed OnClientUnsubscribed;
public event ClientSubscribed OnClientSubscribed;
public event ServerClientDisconnected OnServerClientDisconnected;
public event ServerClientConnected OnServerClientConnected;
public event ServerStarted OnServerStarted;
public event ServerStopped OnServerStopped;
public async Task Start(MQTTCommunicationType type, IPAddress ipAddress, int port, string nick, string password)
{
Type = type;
IPAddress = ipAddress;
Port = port;
Nick = nick;
Password = password;
switch (type)
{
case MQTTCommunicationType.Client:
await StartClient().ConfigureAwait(false);
break;
case MQTTCommunicationType.Server:
await StartServer().ConfigureAwait(false);
break;
}
}
private async Task StartClient()
{
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(IPAddress.ToString(), Port);
// Setup and start a managed MQTT client.
var options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(clientOptions.Build())
.Build();
BindClientHandlers();
await Client.SubscribeAsync(
new TopicFilterBuilder()
.WithTopic("lobby")
.Build()
).ConfigureAwait(false);
await Client.SubscribeAsync(
new TopicFilterBuilder()
.WithTopic("exchange")
.Build()
).ConfigureAwait(false);
await Client.StartAsync(options).ConfigureAwait(false);
Running = true;
}
private async Task StopClient()
{
UnbindClientHandlers();
await Client.StopAsync().ConfigureAwait(false);
}
public void BindClientHandlers()
{
Client.Connected += ClientOnConnected;
Client.Disconnected += ClientOnDisconnected;
Client.ConnectingFailed += ClientOnConnectingFailed;
Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
}
public void UnbindClientHandlers()
{
Client.Connected -= ClientOnConnected;
Client.Disconnected -= ClientOnDisconnected;
Client.ConnectingFailed -= ClientOnConnectingFailed;
Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
}
private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
try
{
e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
}
catch (Exception)
{
await Task.Delay(0).ContinueWith(_ => OnClientAuthenticationFailed?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler);
return;
}
await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler);
}
private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
{
await Task.Delay(0).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
}
private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
{
await Task.Delay(0).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
}
private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
{
await Task.Delay(0).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
}
private async Task StartServer()
{
var optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpointBoundIPAddress(IPAddress)
.WithSubscriptionInterceptor(MQTTSubscriptionIntercept)
.WithConnectionValidator(MQTTConnectionValidator)
.WithDefaultEndpointPort(Port);
BindServerHandlers();
await Server.StartAsync(optionsBuilder.Build()).ConfigureAwait(false);
Running = true;
}
private void MQTTConnectionValidator(MqttConnectionValidatorContext context)
{
// Do not accept connections from banned clients.
if (TrackedClients.Clients.Any(client =>
(string.Equals(client.EndPoint, context.Endpoint, StringComparison.OrdinalIgnoreCase) ||
string.Equals(client.ClientId, context.ClientId, StringComparison.Ordinal)) &&
client.Banned))
{
context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
return;
}
TrackedClients.Clients.Add(new TrackedClient {ClientId = context.ClientId, EndPoint = context.Endpoint});
context.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
}
private async Task StopServer()
{
UnbindServerHandlers();
await Server.StopAsync().ConfigureAwait(false);
}
private void MQTTSubscriptionIntercept(MqttSubscriptionInterceptorContext context)
{
if (context.TopicFilter.Topic != "lobby" &&
context.TopicFilter.Topic != "exchange")
{
context.AcceptSubscription = false;
context.CloseConnection = true;
return;
}
context.AcceptSubscription = true;
context.CloseConnection = false;
}
private void BindServerHandlers()
{
Server.Started += ServerOnStarted;
Server.Stopped += ServerOnStopped;
Server.ClientConnected += ServerOnClientConnected;
Server.ClientDisconnected += ServerOnClientDisconnected;
Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
}
private void UnbindServerHandlers()
{
Server.Started -= ServerOnStarted;
Server.Stopped -= ServerOnStopped;
Server.ClientConnected -= ServerOnClientConnected;
Server.ClientDisconnected -= ServerOnClientDisconnected;
Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
}
private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
try
{
e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
}
catch (Exception)
{
// Decryption failed, assume a rogue client and ban the client.
foreach (var client in TrackedClients.Clients)
{
if (!string.Equals(client.ClientId, e.ClientId, StringComparison.Ordinal))
continue;
client.Banned = true;
foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync())
{
if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal) &&
!string.Equals(clientSessionStatus.Endpoint, client.EndPoint, StringComparison.Ordinal))
continue;
await clientSessionStatus.ClearPendingApplicationMessagesAsync();
await clientSessionStatus.DisconnectAsync();
}
}
await Task.Delay(0).ContinueWith(_ => OnServerAuthenticationFailed?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler);
return;
}
await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
}
private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
{
await Task.Delay(0).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
}
private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
{
await Task.Delay(0).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
}
private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
{
await Task.Delay(0).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
}
private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
{
await Task.Delay(0).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
}
private void ServerOnStopped(object sender, EventArgs e)
{
OnServerStopped?.Invoke(sender, e);
}
private void ServerOnStarted(object sender, EventArgs e)
{
OnServerStarted?.Invoke(sender, e);
}
public async Task Stop()
{
switch (Type)
{
case MQTTCommunicationType.Server:
await StopServer().ConfigureAwait(false);
break;
case MQTTCommunicationType.Client:
await StopClient().ConfigureAwait(false);
break;
}
Running = false;
}
public async Task Broadcast(string topic, byte[] payload)
{
// Encrypt the payload.
var encryptedPayload = await AES.Encrypt(payload, Password);
switch (Type)
{
case MQTTCommunicationType.Client:
await Client.PublishAsync(new ManagedMqttApplicationMessage
{
ApplicationMessage = new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload}
}).ConfigureAwait(false);
break;
case MQTTCommunicationType.Server:
await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload})
.ConfigureAwait(false);
break;
}
}
}
}