WingMan – Blame information for rev 34

Subversion Repositories:
Rev:
Rev Author Line No. Line
10 office 1 using System;
12 office 2 using System.IO;
34 office 3 using System.IO.Compression;
10 office 4 using System.Net;
5 using System.Threading;
6 using System.Threading.Tasks;
34 office 7 using LZ4;
10 office 8 using MQTTnet;
9 using MQTTnet.Client;
10 using MQTTnet.Extensions.ManagedClient;
11 using MQTTnet.Protocol;
12 using MQTTnet.Server;
13 using WingMan.Utilities;
14 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
15 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
16  
17 namespace WingMan.Communication
18 {
19 public class MqttCommunication : IDisposable
20 {
21 public delegate void ClientAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
22  
23 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
24  
25 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
26  
27 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
28  
29 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
30  
31 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
32  
33 public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);
34  
35 public delegate void ServerAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
36  
37 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
38  
39 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
40  
41 public delegate void ServerStarted(object sender, EventArgs e);
42  
43 public delegate void ServerStopped(object sender, EventArgs e);
44  
45 public MqttCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken)
46 {
47 TaskScheduler = taskScheduler;
48 CancellationToken = cancellationToken;
49  
50 Client = new MqttFactory().CreateManagedMqttClient();
51 Server = new MqttFactory().CreateMqttServer();
52 }
53  
54 private TaskScheduler TaskScheduler { get; }
55  
56 private IManagedMqttClient Client { get; }
57  
58 private IMqttServer Server { get; }
59  
60 public bool Running { get; set; }
61  
62 public string Nick { get; set; }
63  
64 private IPAddress IpAddress { get; set; }
65  
66 private int Port { get; set; }
67  
68 private string Password { get; set; }
69  
70 private CancellationToken CancellationToken { get; }
71  
72 public MqttCommunicationType Type { get; set; }
73  
74 public async void Dispose()
75 {
76 await Stop();
77 }
78  
79 public event ClientAuthenticationFailed OnClientAuthenticationFailed;
80  
81 public event ServerAuthenticationFailed OnServerAuthenticationFailed;
82  
83 public event MessageReceived OnMessageReceived;
84  
85 public event ClientConnected OnClientConnected;
86  
87 public event ClientDisconnected OnClientDisconnected;
88  
89 public event ClientConnectionFailed OnClientConnectionFailed;
90  
91 public event ClientUnsubscribed OnClientUnsubscribed;
92  
93 public event ClientSubscribed OnClientSubscribed;
94  
95 public event ServerClientDisconnected OnServerClientDisconnected;
96  
97 public event ServerClientConnected OnServerClientConnected;
98  
99 public event ServerStarted OnServerStarted;
100  
101 public event ServerStopped OnServerStopped;
102  
103 public async Task<bool> Start(MqttCommunicationType type, IPAddress ipAddress, int port, string nick,
104 string password)
105 {
106 Type = type;
107 IpAddress = ipAddress;
108 Port = port;
109 Nick = nick;
110 Password = password;
111  
112 switch (type)
113 {
114 case MqttCommunicationType.Client:
115 return await StartClient();
116 case MqttCommunicationType.Server:
117 return await StartServer();
118 }
119  
120 return false;
121 }
122  
123 private async Task<bool> StartClient()
124 {
125 var clientOptions = new MqttClientOptionsBuilder()
126 .WithTcpServer(IpAddress.ToString(), Port);
127  
128 // Setup and start a managed MQTT client.
129 var options = new ManagedMqttClientOptionsBuilder()
130 .WithClientOptions(clientOptions.Build())
131 .Build();
132  
133 BindClientHandlers();
134  
135 await Client.SubscribeAsync(
136 new TopicFilterBuilder()
137 .WithTopic("lobby")
138 .Build()
139 );
140  
141 await Client.SubscribeAsync(
142 new TopicFilterBuilder()
143 .WithTopic("exchange")
144 .Build()
145 );
146  
147 await Client.SubscribeAsync(
148 new TopicFilterBuilder()
149 .WithTopic("execute")
150 .Build()
151 );
152  
153 await Client.StartAsync(options);
154  
155 Running = true;
156  
157 return Running;
158 }
159  
160 private async Task StopClient()
161 {
162 UnbindClientHandlers();
163  
164 await Client.StopAsync();
165 }
166  
167 public void BindClientHandlers()
168 {
169 Client.Connected += ClientOnConnected;
170 Client.Disconnected += ClientOnDisconnected;
171 Client.ConnectingFailed += ClientOnConnectingFailed;
172 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
173 }
174  
175 public void UnbindClientHandlers()
176 {
177 Client.Connected -= ClientOnConnected;
178 Client.Disconnected -= ClientOnDisconnected;
179 Client.ConnectingFailed -= ClientOnConnectingFailed;
180 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
181 }
182  
183 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
184 {
185 try
186 {
34 office 187 using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload))
188 {
189 using (var decryptedStream = await AES.Decrypt(inputStream, Password))
190 {
191 using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress))
192 {
193 using (var outpuStream = new MemoryStream())
194 {
195 await lz4Decompress.CopyToAsync(outpuStream);
10 office 196  
34 office 197 outpuStream.Position = 0L;
198  
199 e.ApplicationMessage.Payload = outpuStream.ToArray();
200 }
201 }
202 }
203 }
204  
10 office 205 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
206 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
207 }
208 catch (Exception ex)
209 {
210 await Task.Delay(0, CancellationToken).ContinueWith(
211 _ => OnClientAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
212 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
213 }
214 }
215  
216 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
217 {
218 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
219 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
220 }
221  
222 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
223 {
224 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
225 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
226 }
227  
228 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
229 {
230 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
231 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
232 }
233  
234 private async Task<bool> StartServer()
235 {
236 var optionsBuilder = new MqttServerOptionsBuilder()
237 .WithDefaultEndpointBoundIPAddress(IpAddress)
238 .WithDefaultEndpointPort(Port);
239  
240 BindServerHandlers();
241  
242 try
243 {
244 await Server.StartAsync(optionsBuilder.Build());
245  
246 Running = true;
247 }
248 catch (Exception)
249 {
250 Running = false;
251 }
252  
253 return Running;
254 }
255  
256 private async Task StopServer()
257 {
258 UnbindServerHandlers();
259  
260 await Server.StopAsync();
261 }
262  
263 private void BindServerHandlers()
264 {
265 Server.Started += ServerOnStarted;
266 Server.Stopped += ServerOnStopped;
267 Server.ClientConnected += ServerOnClientConnected;
268 Server.ClientDisconnected += ServerOnClientDisconnected;
269 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
270 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
271 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
272 }
273  
274 private void UnbindServerHandlers()
275 {
276 Server.Started -= ServerOnStarted;
277 Server.Stopped -= ServerOnStopped;
278 Server.ClientConnected -= ServerOnClientConnected;
279 Server.ClientDisconnected -= ServerOnClientDisconnected;
280 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
281 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
282 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
283 }
284  
285 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
286 {
287 try
288 {
34 office 289 using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload))
290 {
291 using (var decryptedStream = await AES.Decrypt(inputStream, Password))
292 {
293 using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress))
294 {
295 using (var outpuStream = new MemoryStream())
296 {
297 await lz4Decompress.CopyToAsync(outpuStream);
10 office 298  
34 office 299 outpuStream.Position = 0L;
300  
301 e.ApplicationMessage.Payload = outpuStream.ToArray();
302 }
303 }
304 }
305 }
306  
10 office 307 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
308 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
309 }
310 catch (Exception ex)
311 {
312 foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync())
313 {
314 if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal))
315 continue;
316  
317 await clientSessionStatus.DisconnectAsync();
318 }
319  
320 await Task.Delay(0, CancellationToken).ContinueWith(
321 _ => OnServerAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
322 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
323 }
324 }
325  
326 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
327 {
328 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
329 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
330 }
331  
332 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
333 {
334 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
335 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
336 }
337  
338 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
339 {
340 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
341 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
342 }
343  
344 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
345 {
346 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
347 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
348 }
349  
12 office 350 private async void ServerOnStopped(object sender, EventArgs e)
10 office 351 {
12 office 352 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStopped?.Invoke(sender, e),
353 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
10 office 354 }
355  
12 office 356 private async void ServerOnStarted(object sender, EventArgs e)
10 office 357 {
12 office 358 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStarted?.Invoke(sender, e),
359 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
10 office 360 }
361  
362 public async Task Stop()
363 {
364 switch (Type)
365 {
366 case MqttCommunicationType.Server:
367 await StopServer();
368 break;
369 case MqttCommunicationType.Client:
370 await StopClient();
371 break;
372 }
373  
374 Running = false;
375 }
376  
377 public async Task Broadcast(string topic, byte[] payload)
378 {
34 office 379 using (var compressStream = new MemoryStream())
10 office 380 {
34 office 381 using (var lz4Stream = new LZ4Stream(compressStream, CompressionMode.Compress))
12 office 382 {
34 office 383 await lz4Stream.WriteAsync(payload, 0, payload.Length);
384 await lz4Stream.FlushAsync();
385  
386 compressStream.Position = 0L;
387  
388 using (var outputStream = await AES.Encrypt(compressStream, Password))
389 {
390 var data = outputStream.ToArray();
391 switch (Type)
12 office 392 {
34 office 393 case MqttCommunicationType.Client:
394 await Client.PublishAsync(new[]
395 {
396 new MqttApplicationMessage
397 {
398 Topic = topic,
399 Payload = data,
400 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
401 }
402 });
403 break;
404 case MqttCommunicationType.Server:
405 await Server.PublishAsync(new[]
406 {
407 new MqttApplicationMessage
408 {
409 Topic = topic,
410 Payload = data,
411 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
412 }
413 });
414 break;
415 }
416 }
12 office 417 }
10 office 418 }
419 }
420 }
421 }