WingMan – Blame information for rev

Subversion Repositories:
Rev:
Rev Author Line No. Line
5 office 1 using System;
8 office 2 using System.Collections.Generic;
3 using System.Linq;
5 office 4 using System.Net;
6 office 5 using System.Threading;
5 office 6 using System.Threading.Tasks;
7 using MQTTnet;
8 using MQTTnet.Client;
9 using MQTTnet.Extensions.ManagedClient;
8 office 10 using MQTTnet.Protocol;
5 office 11 using MQTTnet.Server;
8 office 12 using WingMan.Utilities;
5 office 13 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
14 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
15  
16 namespace WingMan.Communication
17 {
7 office 18 public class MQTTCommunication : IDisposable
5 office 19 {
8 office 20 public delegate void ClientAuthenticationFailed(object sender, EventArgs e);
21  
7 office 22 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
23  
24 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
25  
26 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
27  
28 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
29  
30 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
31  
32 public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);
33  
8 office 34 public delegate void ServerAuthenticationFailed(object sender, EventArgs e);
35  
7 office 36 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
37  
38 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
39  
40 public delegate void ServerStarted(object sender, EventArgs e);
41  
42 public delegate void ServerStopped(object sender, EventArgs e);
43  
44 public MQTTCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken)
5 office 45 {
6 office 46 TaskScheduler = taskScheduler;
7 office 47 CancellationToken = cancellationToken;
5 office 48  
8 office 49 TrackedClients = new TrackedClients {Clients = new List<TrackedClient>()};
50  
5 office 51 Client = new MqttFactory().CreateManagedMqttClient();
52 Server = new MqttFactory().CreateMqttServer();
53 }
54  
8 office 55 private TrackedClients TrackedClients { get; }
56  
7 office 57 private TaskScheduler TaskScheduler { get; }
6 office 58  
5 office 59 private IManagedMqttClient Client { get; }
60  
61 private IMqttServer Server { get; }
62  
63 public bool Running { get; set; }
64  
65 public string Nick { get; set; }
66  
67 private IPAddress IPAddress { get; set; }
68  
69 private int Port { get; set; }
70  
8 office 71 private string Password { get; set; }
72  
7 office 73 private CancellationToken CancellationToken { get; }
6 office 74  
5 office 75 public MQTTCommunicationType Type { get; set; }
76  
7 office 77 public async void Dispose()
78 {
79 await Stop();
80 }
5 office 81  
8 office 82 public event ClientAuthenticationFailed OnClientAuthenticationFailed;
83  
84 public event ServerAuthenticationFailed OnServerAuthenticationFailed;
85  
5 office 86 public event MessageReceived OnMessageReceived;
87  
88 public event ClientConnected OnClientConnected;
89  
90 public event ClientDisconnected OnClientDisconnected;
91  
92 public event ClientConnectionFailed OnClientConnectionFailed;
93  
94 public event ClientUnsubscribed OnClientUnsubscribed;
95  
96 public event ClientSubscribed OnClientSubscribed;
97  
98 public event ServerClientDisconnected OnServerClientDisconnected;
99  
100 public event ServerClientConnected OnServerClientConnected;
101  
102 public event ServerStarted OnServerStarted;
103  
104 public event ServerStopped OnServerStopped;
105  
8 office 106 public async Task Start(MQTTCommunicationType type, IPAddress ipAddress, int port, string nick, string password)
5 office 107 {
108 Type = type;
109 IPAddress = ipAddress;
110 Port = port;
111 Nick = nick;
8 office 112 Password = password;
5 office 113  
114 switch (type)
115 {
116 case MQTTCommunicationType.Client:
117 await StartClient().ConfigureAwait(false);
118 break;
119 case MQTTCommunicationType.Server:
120 await StartServer().ConfigureAwait(false);
121 break;
122 }
123 }
124  
125 private async Task StartClient()
126 {
127 var clientOptions = new MqttClientOptionsBuilder()
128 .WithTcpServer(IPAddress.ToString(), Port);
129  
130 // Setup and start a managed MQTT client.
131 var options = new ManagedMqttClientOptionsBuilder()
132 .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
133 .WithClientOptions(clientOptions.Build())
134 .Build();
135  
136 BindClientHandlers();
137  
138 await Client.SubscribeAsync(
139 new TopicFilterBuilder()
140 .WithTopic("lobby")
141 .Build()
142 ).ConfigureAwait(false);
143  
144 await Client.SubscribeAsync(
145 new TopicFilterBuilder()
146 .WithTopic("exchange")
147 .Build()
148 ).ConfigureAwait(false);
149  
150 await Client.StartAsync(options).ConfigureAwait(false);
151  
152 Running = true;
153 }
154  
155 private async Task StopClient()
156 {
157 UnbindClientHandlers();
158  
159 await Client.StopAsync().ConfigureAwait(false);
160 }
161  
162 public void BindClientHandlers()
163 {
164 Client.Connected += ClientOnConnected;
165 Client.Disconnected += ClientOnDisconnected;
166 Client.ConnectingFailed += ClientOnConnectingFailed;
167 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
168 }
169  
170 public void UnbindClientHandlers()
171 {
172 Client.Connected -= ClientOnConnected;
173 Client.Disconnected -= ClientOnDisconnected;
174 Client.ConnectingFailed -= ClientOnConnectingFailed;
175 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
176 }
177  
6 office 178 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
5 office 179 {
8 office 180 try
181 {
182 e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
183 }
184 catch (Exception)
185 {
186 await Task.Delay(0).ContinueWith(_ => OnClientAuthenticationFailed?.Invoke(sender, e),
187 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
188  
189 return;
190 }
191  
6 office 192 await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
7 office 193 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
5 office 194 }
195  
6 office 196 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
5 office 197 {
6 office 198 await Task.Delay(0).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
7 office 199 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 200 }
201  
6 office 202 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
5 office 203 {
6 office 204 await Task.Delay(0).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
7 office 205 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 206 }
207  
6 office 208 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
5 office 209 {
6 office 210 await Task.Delay(0).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
7 office 211 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 212 }
213  
214 private async Task StartServer()
215 {
216 var optionsBuilder = new MqttServerOptionsBuilder()
217 .WithDefaultEndpointBoundIPAddress(IPAddress)
218 .WithSubscriptionInterceptor(MQTTSubscriptionIntercept)
8 office 219 .WithConnectionValidator(MQTTConnectionValidator)
5 office 220 .WithDefaultEndpointPort(Port);
221  
222 BindServerHandlers();
223  
224 await Server.StartAsync(optionsBuilder.Build()).ConfigureAwait(false);
225  
226 Running = true;
227 }
228  
8 office 229 private void MQTTConnectionValidator(MqttConnectionValidatorContext context)
230 {
231 // Do not accept connections from banned clients.
232 if (TrackedClients.Clients.Any(client =>
233 (string.Equals(client.EndPoint, context.Endpoint, StringComparison.OrdinalIgnoreCase) ||
234 string.Equals(client.ClientId, context.ClientId, StringComparison.Ordinal)) &&
235 client.Banned))
236 {
237 context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
238 return;
239 }
240  
241 TrackedClients.Clients.Add(new TrackedClient {ClientId = context.ClientId, EndPoint = context.Endpoint});
242  
243 context.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
244 }
245  
5 office 246 private async Task StopServer()
247 {
248 UnbindServerHandlers();
249  
250 await Server.StopAsync().ConfigureAwait(false);
251 }
252  
253 private void MQTTSubscriptionIntercept(MqttSubscriptionInterceptorContext context)
254 {
255 if (context.TopicFilter.Topic != "lobby" &&
256 context.TopicFilter.Topic != "exchange")
257 {
258 context.AcceptSubscription = false;
259 context.CloseConnection = true;
260 return;
261 }
262  
263 context.AcceptSubscription = true;
264 context.CloseConnection = false;
265 }
266  
267 private void BindServerHandlers()
268 {
269 Server.Started += ServerOnStarted;
270 Server.Stopped += ServerOnStopped;
271 Server.ClientConnected += ServerOnClientConnected;
272 Server.ClientDisconnected += ServerOnClientDisconnected;
273 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
274 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
275 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
276 }
277  
278 private void UnbindServerHandlers()
279 {
280 Server.Started -= ServerOnStarted;
281 Server.Stopped -= ServerOnStopped;
282 Server.ClientConnected -= ServerOnClientConnected;
283 Server.ClientDisconnected -= ServerOnClientDisconnected;
284 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
285 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
286 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
287 }
288  
6 office 289 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
5 office 290 {
8 office 291 try
292 {
293 e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
294 }
295 catch (Exception)
296 {
297 // Decryption failed, assume a rogue client and ban the client.
298 foreach (var client in TrackedClients.Clients)
299 {
300 if (!string.Equals(client.ClientId, e.ClientId, StringComparison.Ordinal))
301 continue;
302  
303 client.Banned = true;
304  
305 foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync())
306 {
307 if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal) &&
308 !string.Equals(clientSessionStatus.Endpoint, client.EndPoint, StringComparison.Ordinal))
309 continue;
310  
311 await clientSessionStatus.ClearPendingApplicationMessagesAsync();
312 await clientSessionStatus.DisconnectAsync();
313 }
314 }
315  
316 await Task.Delay(0).ContinueWith(_ => OnServerAuthenticationFailed?.Invoke(sender, e),
317 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
318  
319 return;
320 }
321  
6 office 322 await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
7 office 323 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 324 }
325  
6 office 326 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
5 office 327 {
6 office 328 await Task.Delay(0).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
7 office 329 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 330 }
331  
6 office 332 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
5 office 333 {
6 office 334 await Task.Delay(0).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
7 office 335 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 336 }
337  
6 office 338 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
5 office 339 {
6 office 340 await Task.Delay(0).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
7 office 341 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 342 }
343  
6 office 344 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
5 office 345 {
6 office 346 await Task.Delay(0).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
7 office 347 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 348 }
349  
350 private void ServerOnStopped(object sender, EventArgs e)
351 {
352 OnServerStopped?.Invoke(sender, e);
353 }
354  
355 private void ServerOnStarted(object sender, EventArgs e)
356 {
357 OnServerStarted?.Invoke(sender, e);
358 }
359  
360 public async Task Stop()
361 {
362 switch (Type)
363 {
364 case MQTTCommunicationType.Server:
365 await StopServer().ConfigureAwait(false);
366 break;
367 case MQTTCommunicationType.Client:
368 await StopClient().ConfigureAwait(false);
369 break;
370 }
371  
372 Running = false;
373 }
374  
375 public async Task Broadcast(string topic, byte[] payload)
376 {
8 office 377 // Encrypt the payload.
378 var encryptedPayload = await AES.Encrypt(payload, Password);
379  
5 office 380 switch (Type)
381 {
382 case MQTTCommunicationType.Client:
8 office 383  
5 office 384 await Client.PublishAsync(new ManagedMqttApplicationMessage
385 {
8 office 386 ApplicationMessage = new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload}
5 office 387 }).ConfigureAwait(false);
388 break;
389 case MQTTCommunicationType.Server:
8 office 390 await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload})
7 office 391 .ConfigureAwait(false);
5 office 392 break;
393 }
394 }
395 }
8 office 396 }