WingMan – Diff between revs 35 and 36

Subversion Repositories:
Rev:
Only display areas with differencesIgnore whitespace
Rev 35 Rev 36
1 using System; 1 using System;
2 using System.IO; 2 using System.IO;
3 using System.IO.Compression; 3 using System.IO.Compression;
4 using System.Net; 4 using System.Net;
5 using System.Threading; 5 using System.Threading;
6 using System.Threading.Tasks; 6 using System.Threading.Tasks;
7 using LZ4; 7 using LZ4;
8 using MQTTnet; 8 using MQTTnet;
9 using MQTTnet.Client; 9 using MQTTnet.Client;
10 using MQTTnet.Extensions.ManagedClient; 10 using MQTTnet.Extensions.ManagedClient;
11 using MQTTnet.Protocol; 11 using MQTTnet.Protocol;
12 using MQTTnet.Server; 12 using MQTTnet.Server;
13 using WingMan.Utilities; 13 using WingMan.Utilities;
14 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs; 14 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
15 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs; 15 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
16   16  
17 namespace WingMan.Communication 17 namespace WingMan.Communication
18 { 18 {
19 public class MqttCommunication : IDisposable 19 public class MqttCommunication : IDisposable
20 { 20 {
21 public delegate void ClientAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e); 21 public delegate void ClientAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
22   22  
23 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e); 23 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
24   24  
25 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e); 25 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
26   26  
27 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e); 27 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
28   28  
29 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e); 29 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
30   30  
31 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e); 31 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
32   32  
33 public delegate void MessageReceived(object sender, MqttCommunicationMessageReceivedEventArgs e); 33 public delegate void MessageReceived(object sender, MqttCommunicationMessageReceivedEventArgs e);
34   34  
35 public delegate void ServerAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e); 35 public delegate void ServerAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
36   36  
37 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e); 37 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
38   38  
39 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e); 39 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
40   40  
41 public delegate void ServerStarted(object sender, EventArgs e); 41 public delegate void ServerStarted(object sender, EventArgs e);
42   42  
43 public delegate void ServerStopped(object sender, EventArgs e); 43 public delegate void ServerStopped(object sender, EventArgs e);
44   44  
45 public MqttCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken) 45 public MqttCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken)
46 { 46 {
47 TaskScheduler = taskScheduler; 47 TaskScheduler = taskScheduler;
48 CancellationToken = cancellationToken; 48 CancellationToken = cancellationToken;
49   49  
50 Client = new MqttFactory().CreateManagedMqttClient(); 50 Client = new MqttFactory().CreateManagedMqttClient();
51 Server = new MqttFactory().CreateMqttServer(); 51 Server = new MqttFactory().CreateMqttServer();
52 } 52 }
53   53  
54 private TaskScheduler TaskScheduler { get; } 54 private TaskScheduler TaskScheduler { get; }
55   55  
56 private IManagedMqttClient Client { get; } 56 private IManagedMqttClient Client { get; }
57   57  
58 private IMqttServer Server { get; } 58 private IMqttServer Server { get; }
59   59  
60 public bool Running { get; set; } 60 public bool Running { get; set; }
61   61  
62 public string Nick { get; set; } 62 public string Nick { get; set; }
63   63  
64 private IPAddress IpAddress { get; set; } 64 private IPAddress IpAddress { get; set; }
65   65  
66 private int Port { get; set; } 66 public int Port { get; set; }
67   67  
68 private string Password { get; set; } 68 private string Password { get; set; }
69   69  
70 private CancellationToken CancellationToken { get; } 70 private CancellationToken CancellationToken { get; }
71   71  
72 public MqttCommunicationType Type { get; set; } 72 public MqttCommunicationType Type { get; set; }
73   73  
74 public async void Dispose() 74 public async void Dispose()
75 { 75 {
76 await Stop(); 76 await Stop();
77 } 77 }
78   78  
79 public event ClientAuthenticationFailed OnClientAuthenticationFailed; 79 public event ClientAuthenticationFailed OnClientAuthenticationFailed;
80   80  
81 public event ServerAuthenticationFailed OnServerAuthenticationFailed; 81 public event ServerAuthenticationFailed OnServerAuthenticationFailed;
82   82  
83 public event MessageReceived OnMessageReceived; 83 public event MessageReceived OnMessageReceived;
84   84  
85 public event ClientConnected OnClientConnected; 85 public event ClientConnected OnClientConnected;
86   86  
87 public event ClientDisconnected OnClientDisconnected; 87 public event ClientDisconnected OnClientDisconnected;
88   88  
89 public event ClientConnectionFailed OnClientConnectionFailed; 89 public event ClientConnectionFailed OnClientConnectionFailed;
90   90  
91 public event ClientUnsubscribed OnClientUnsubscribed; 91 public event ClientUnsubscribed OnClientUnsubscribed;
92   92  
93 public event ClientSubscribed OnClientSubscribed; 93 public event ClientSubscribed OnClientSubscribed;
94   94  
95 public event ServerClientDisconnected OnServerClientDisconnected; 95 public event ServerClientDisconnected OnServerClientDisconnected;
96   96  
97 public event ServerClientConnected OnServerClientConnected; 97 public event ServerClientConnected OnServerClientConnected;
98   98  
99 public event ServerStarted OnServerStarted; 99 public event ServerStarted OnServerStarted;
100   100  
101 public event ServerStopped OnServerStopped; 101 public event ServerStopped OnServerStopped;
102   102  
103 public async Task<bool> Start(MqttCommunicationType type, IPAddress ipAddress, int port, string nick, 103 public async Task<bool> Start(MqttCommunicationType type, IPAddress ipAddress, int port, string nick,
104 string password) 104 string password)
105 { 105 {
106 Type = type; 106 Type = type;
107 IpAddress = ipAddress; 107 IpAddress = ipAddress;
108 Port = port; 108 Port = port;
109 Nick = nick; 109 Nick = nick;
110 Password = password; 110 Password = password;
111   111  
112 switch (type) 112 switch (type)
113 { 113 {
114 case MqttCommunicationType.Client: 114 case MqttCommunicationType.Client:
115 return await StartClient(); 115 return await StartClient();
116 case MqttCommunicationType.Server: 116 case MqttCommunicationType.Server:
117 return await StartServer(); 117 return await StartServer();
118 } 118 }
119   119  
120 return false; 120 return false;
121 } 121 }
122   122  
123 private async Task<bool> StartClient() 123 private async Task<bool> StartClient()
124 { 124 {
125 var clientOptions = new MqttClientOptionsBuilder() 125 var clientOptions = new MqttClientOptionsBuilder()
126 .WithTcpServer(IpAddress.ToString(), Port); 126 .WithTcpServer(IpAddress.ToString(), Port);
127   127  
128 // Setup and start a managed MQTT client. 128 // Setup and start a managed MQTT client.
129 var options = new ManagedMqttClientOptionsBuilder() 129 var options = new ManagedMqttClientOptionsBuilder()
130 .WithClientOptions(clientOptions.Build()) 130 .WithClientOptions(clientOptions.Build())
131 .Build(); 131 .Build();
132   132  
133 BindClientHandlers(); 133 BindClientHandlers();
134   134  
135 await Client.SubscribeAsync( 135 await Client.SubscribeAsync(
136 new TopicFilterBuilder() 136 new TopicFilterBuilder()
137 .WithTopic("lobby") 137 .WithTopic("lobby")
138 .Build() 138 .Build()
139 ); 139 );
140   140  
141 await Client.SubscribeAsync( 141 await Client.SubscribeAsync(
142 new TopicFilterBuilder() 142 new TopicFilterBuilder()
143 .WithTopic("exchange") 143 .WithTopic("exchange")
144 .Build() 144 .Build()
145 ); 145 );
146   146  
147 await Client.SubscribeAsync( 147 await Client.SubscribeAsync(
148 new TopicFilterBuilder() 148 new TopicFilterBuilder()
149 .WithTopic("execute") 149 .WithTopic("execute")
150 .Build() 150 .Build()
151 ); 151 );
152   152  
153 await Client.StartAsync(options); 153 await Client.StartAsync(options);
154   154  
155 Running = true; 155 Running = true;
156   156  
157 return Running; 157 return Running;
158 } 158 }
159   159  
160 private async Task StopClient() 160 private async Task StopClient()
161 { 161 {
162 UnbindClientHandlers(); -  
163   -  
164 await Client.StopAsync(); 162 await Client.StopAsync();
-   163  
-   164 UnbindClientHandlers();
165 } 165 }
166   166  
167 public void BindClientHandlers() 167 public void BindClientHandlers()
168 { 168 {
169 Client.Connected += ClientOnConnected; 169 Client.Connected += ClientOnConnected;
170 Client.Disconnected += ClientOnDisconnected; 170 Client.Disconnected += ClientOnDisconnected;
171 Client.ConnectingFailed += ClientOnConnectingFailed; 171 Client.ConnectingFailed += ClientOnConnectingFailed;
172 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived; 172 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
173 } 173 }
174   174  
175 public void UnbindClientHandlers() 175 public void UnbindClientHandlers()
176 { 176 {
177 Client.Connected -= ClientOnConnected; 177 Client.Connected -= ClientOnConnected;
178 Client.Disconnected -= ClientOnDisconnected; 178 Client.Disconnected -= ClientOnDisconnected;
179 Client.ConnectingFailed -= ClientOnConnectingFailed; 179 Client.ConnectingFailed -= ClientOnConnectingFailed;
180 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived; 180 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
181 } 181 }
182   182  
183 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) 183 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
184 { 184 {
185 try 185 try
186 { 186 {
187 using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload)) 187 using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload))
188 { 188 {
189 using (var decryptedStream = await AES.Decrypt(inputStream, Password)) 189 using (var decryptedStream = await Aes.Decrypt(inputStream, Password))
190 { 190 {
191 using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress)) 191 using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress))
192 { 192 {
193 var outpuStream = new MemoryStream(); 193 var outpuStream = new MemoryStream();
194 await lz4Decompress.CopyToAsync(outpuStream); 194 await lz4Decompress.CopyToAsync(outpuStream);
195   195  
196 outpuStream.Position = 0L; 196 outpuStream.Position = 0L;
197   197  
198 await Task.Delay(0, CancellationToken).ContinueWith( 198 await Task.Delay(0, CancellationToken).ContinueWith(
199 _ => OnMessageReceived?.Invoke(sender, 199 _ => OnMessageReceived?.Invoke(sender,
200 new MqttCommunicationMessageReceivedEventArgs(e.ApplicationMessage.Topic, 200 new MqttCommunicationMessageReceivedEventArgs(e.ApplicationMessage.Topic,
201 outpuStream)), 201 outpuStream)),
202 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 202 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
203 } 203 }
204 } 204 }
205 } 205 }
206 } 206 }
207 catch (Exception ex) 207 catch (Exception ex)
208 { 208 {
209 await Task.Delay(0, CancellationToken).ContinueWith( 209 await Task.Delay(0, CancellationToken).ContinueWith(
210 _ => OnClientAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)), 210 _ => OnClientAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
211 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 211 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
212 } 212 }
213 } 213 }
214   214  
215 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e) 215 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
216 { 216 {
217 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e), 217 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
218 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 218 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
219 } 219 }
220   220  
221 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e) 221 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
222 { 222 {
223 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e), 223 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
224 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 224 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
225 } 225 }
226   226  
227 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e) 227 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
228 { 228 {
229 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnected?.Invoke(sender, e), 229 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
230 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 230 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
231 } 231 }
232   232  
233 private async Task<bool> StartServer() 233 private async Task<bool> StartServer()
234 { 234 {
235 var optionsBuilder = new MqttServerOptionsBuilder() 235 var optionsBuilder = new MqttServerOptionsBuilder()
236 .WithDefaultEndpointBoundIPAddress(IpAddress) 236 .WithDefaultEndpointBoundIPAddress(IpAddress)
237 .WithDefaultEndpointPort(Port); 237 .WithDefaultEndpointPort(Port);
238   238  
239 BindServerHandlers(); 239 BindServerHandlers();
240   240  
241 try 241 try
242 { 242 {
243 await Server.StartAsync(optionsBuilder.Build()); 243 await Server.StartAsync(optionsBuilder.Build());
244   244  
245 Running = true; 245 Running = true;
246 } 246 }
247 catch (Exception) 247 catch (Exception)
248 { 248 {
249 Running = false; 249 Running = false;
250 } 250 }
251   251  
252 return Running; 252 return Running;
253 } 253 }
254   254  
255 private async Task StopServer() 255 private async Task StopServer()
256 { 256 {
257 UnbindServerHandlers(); -  
258   -  
259 await Server.StopAsync(); 257 await Server.StopAsync();
-   258  
-   259 UnbindServerHandlers();
260 } 260 }
261   261  
262 private void BindServerHandlers() 262 private void BindServerHandlers()
263 { 263 {
264 Server.Started += ServerOnStarted; 264 Server.Started += ServerOnStarted;
265 Server.Stopped += ServerOnStopped; 265 Server.Stopped += ServerOnStopped;
266 Server.ClientConnected += ServerOnClientConnected; 266 Server.ClientConnected += ServerOnClientConnected;
267 Server.ClientDisconnected += ServerOnClientDisconnected; 267 Server.ClientDisconnected += ServerOnClientDisconnected;
268 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic; 268 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
269 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic; 269 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
270 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived; 270 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
271 } 271 }
272   272  
273 private void UnbindServerHandlers() 273 private void UnbindServerHandlers()
274 { 274 {
275 Server.Started -= ServerOnStarted; 275 Server.Started -= ServerOnStarted;
276 Server.Stopped -= ServerOnStopped; 276 Server.Stopped -= ServerOnStopped;
277 Server.ClientConnected -= ServerOnClientConnected; 277 Server.ClientConnected -= ServerOnClientConnected;
278 Server.ClientDisconnected -= ServerOnClientDisconnected; 278 Server.ClientDisconnected -= ServerOnClientDisconnected;
279 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic; 279 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
280 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic; 280 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
281 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived; 281 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
282 } 282 }
283   283  
284 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) 284 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
285 { 285 {
286 try 286 try
287 { 287 {
288 using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload)) 288 using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload))
289 { 289 {
290 using (var decryptedStream = await AES.Decrypt(inputStream, Password)) 290 using (var decryptedStream = await Aes.Decrypt(inputStream, Password))
291 { 291 {
292 using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress)) 292 using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress))
293 { 293 {
294 var outpuStream = new MemoryStream(); 294 var outpuStream = new MemoryStream();
295 await lz4Decompress.CopyToAsync(outpuStream); 295 await lz4Decompress.CopyToAsync(outpuStream);
296   296  
297 outpuStream.Position = 0L; 297 outpuStream.Position = 0L;
298   298  
299 await Task.Delay(0, CancellationToken).ContinueWith( 299 await Task.Delay(0, CancellationToken).ContinueWith(
300 _ => OnMessageReceived?.Invoke(sender, 300 _ => OnMessageReceived?.Invoke(sender,
301 new MqttCommunicationMessageReceivedEventArgs(e.ApplicationMessage.Topic, 301 new MqttCommunicationMessageReceivedEventArgs(e.ApplicationMessage.Topic,
302 outpuStream)), 302 outpuStream)),
303 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 303 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
304 } 304 }
305 } 305 }
306 } 306 }
307 } 307 }
308 catch (Exception ex) 308 catch (Exception ex)
309 { 309 {
310 foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync()) 310 foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync())
311 { 311 {
312 if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal)) 312 if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal))
313 continue; 313 continue;
314   314  
315 await clientSessionStatus.DisconnectAsync(); 315 await clientSessionStatus.DisconnectAsync();
316 } 316 }
317   317  
318 await Task.Delay(0, CancellationToken).ContinueWith( 318 await Task.Delay(0, CancellationToken).ContinueWith(
319 _ => OnServerAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)), 319 _ => OnServerAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
320 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 320 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
321 } 321 }
322 } 322 }
323   323  
324 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e) 324 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
325 { 325 {
326 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e), 326 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
327 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 327 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
328 } 328 }
329   329  
330 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e) 330 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
331 { 331 {
332 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e), 332 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
333 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 333 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
334 } 334 }
335   335  
336 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e) 336 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
337 { 337 {
338 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e), 338 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
339 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 339 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
340 } 340 }
341   341  
342 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e) 342 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
343 { 343 {
344 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e), 344 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
345 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 345 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
346 } 346 }
347   347  
348 private async void ServerOnStopped(object sender, EventArgs e) 348 private async void ServerOnStopped(object sender, EventArgs e)
349 { 349 {
350 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStopped?.Invoke(sender, e), 350 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStopped?.Invoke(sender, e),
351 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 351 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
352 } 352 }
353   353  
354 private async void ServerOnStarted(object sender, EventArgs e) 354 private async void ServerOnStarted(object sender, EventArgs e)
355 { 355 {
356 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStarted?.Invoke(sender, e), 356 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStarted?.Invoke(sender, e),
357 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 357 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
358 } 358 }
359   359  
360 public async Task Stop() 360 public async Task Stop()
361 { 361 {
362 switch (Type) 362 switch (Type)
363 { 363 {
364 case MqttCommunicationType.Server: 364 case MqttCommunicationType.Server:
365 await StopServer(); 365 await StopServer();
366 break; 366 break;
367 case MqttCommunicationType.Client: 367 case MqttCommunicationType.Client:
368 await StopClient(); 368 await StopClient();
369 break; 369 break;
370 } 370 }
371   371  
372 Running = false; 372 Running = false;
373 } 373 }
374   374  
375 public async Task Broadcast(string topic, byte[] payload) 375 public async Task Broadcast(string topic, byte[] payload)
376 { 376 {
377 using (var compressStream = new MemoryStream()) 377 using (var compressStream = new MemoryStream())
378 { 378 {
379 using (var lz4Stream = new LZ4Stream(compressStream, CompressionMode.Compress)) 379 using (var lz4Stream = new LZ4Stream(compressStream, CompressionMode.Compress))
380 { 380 {
381 await lz4Stream.WriteAsync(payload, 0, payload.Length); 381 await lz4Stream.WriteAsync(payload, 0, payload.Length);
382 await lz4Stream.FlushAsync(); 382 await lz4Stream.FlushAsync();
383   383  
384 compressStream.Position = 0L; 384 compressStream.Position = 0L;
385   385  
386 using (var outputStream = await AES.Encrypt(compressStream, Password)) 386 using (var outputStream = await Aes.Encrypt(compressStream, Password))
387 { 387 {
388 var data = outputStream.ToArray(); 388 var data = outputStream.ToArray();
389 switch (Type) 389 switch (Type)
390 { 390 {
391 case MqttCommunicationType.Client: 391 case MqttCommunicationType.Client:
392 await Client.PublishAsync(new[] 392 await Client.PublishAsync(new[]
393 { 393 {
394 new MqttApplicationMessage 394 new MqttApplicationMessage
395 { 395 {
396 Topic = topic, 396 Topic = topic,
397 Payload = data, 397 Payload = data,
398 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce 398 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
399 } 399 }
400 }); 400 });
401 break; 401 break;
402 case MqttCommunicationType.Server: 402 case MqttCommunicationType.Server:
403 await Server.PublishAsync(new[] 403 await Server.PublishAsync(new[]
404 { 404 {
405 new MqttApplicationMessage 405 new MqttApplicationMessage
406 { 406 {
407 Topic = topic, 407 Topic = topic,
408 Payload = data, 408 Payload = data,
409 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce 409 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
410 } 410 }
411 }); 411 });
412 break; 412 break;
413 } 413 }
414 } 414 }
415 } 415 }
416 } 416 }
417 } 417 }
418 } 418 }
419 } 419 }
420   420