WingMan – Diff between revs 14 and 34

Subversion Repositories:
Rev:
Show entire fileIgnore whitespace
Rev 14 Rev 34
Line 1... Line 1...
1 using System; 1 using System;
2 using System.IO; 2 using System.IO;
-   3 using System.IO.Compression;
3 using System.Net; 4 using System.Net;
4 using System.Threading; 5 using System.Threading;
5 using System.Threading.Tasks; 6 using System.Threading.Tasks;
-   7 using LZ4;
6 using MQTTnet; 8 using MQTTnet;
7 using MQTTnet.Client; 9 using MQTTnet.Client;
8 using MQTTnet.Extensions.ManagedClient; 10 using MQTTnet.Extensions.ManagedClient;
9 using MQTTnet.Protocol; 11 using MQTTnet.Protocol;
10 using MQTTnet.Server; 12 using MQTTnet.Server;
Line 180... Line 182...
180   182  
181 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) 183 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
182 { 184 {
183 try 185 try
184 { 186 {
-   187 using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload))
-   188 {
-   189 using (var decryptedStream = await AES.Decrypt(inputStream, Password))
-   190 {
-   191 using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress))
-   192 {
-   193 using (var outpuStream = new MemoryStream())
-   194 {
-   195 await lz4Decompress.CopyToAsync(outpuStream);
-   196  
-   197 outpuStream.Position = 0L;
-   198  
-   199 e.ApplicationMessage.Payload = outpuStream.ToArray();
-   200 }
-   201 }
-   202 }
Line 185... Line 203...
185 e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); 203 }
186   204  
187 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), 205 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
188 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 206 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
Line 266... Line 284...
266   284  
267 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) 285 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
268 { 286 {
269 try 287 try
270 { 288 {
-   289 using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload))
-   290 {
-   291 using (var decryptedStream = await AES.Decrypt(inputStream, Password))
-   292 {
-   293 using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress))
-   294 {
-   295 using (var outpuStream = new MemoryStream())
-   296 {
-   297 await lz4Decompress.CopyToAsync(outpuStream);
-   298  
-   299 outpuStream.Position = 0L;
-   300  
-   301 e.ApplicationMessage.Payload = outpuStream.ToArray();
-   302 }
-   303 }
-   304 }
Line 271... Line 305...
271 e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); 305 }
272   306  
273 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), 307 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
274 CancellationToken, TaskContinuationOptions.None, TaskScheduler); 308 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
Line 340... Line 374...
340 Running = false; 374 Running = false;
341 } 375 }
Line 342... Line 376...
342   376  
343 public async Task Broadcast(string topic, byte[] payload) 377 public async Task Broadcast(string topic, byte[] payload)
344 { 378 {
345 using (var payloadStream = new MemoryStream(await AES.Encrypt(payload, Password))) 379 using (var compressStream = new MemoryStream())
346 { 380 {
347 switch (Type) 381 using (var lz4Stream = new LZ4Stream(compressStream, CompressionMode.Compress))
348 { 382 {
349 case MqttCommunicationType.Client: 383 await lz4Stream.WriteAsync(payload, 0, payload.Length);
-   384 await lz4Stream.FlushAsync();
350 await Client.PublishAsync(new[] 385  
-   386 compressStream.Position = 0L;
351 { 387  
352 new MqttApplicationMessage 388 using (var outputStream = await AES.Encrypt(compressStream, Password))
353 { -  
354 Topic = topic, 389 {
355 Payload = payloadStream.ToArray(), -  
356 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce 390 var data = outputStream.ToArray();
357 } -  
358 }); -  
359 break; -  
360 case MqttCommunicationType.Server: -  
361 await Server.PublishAsync(new[] 391 switch (Type)
-   392 {
-   393 case MqttCommunicationType.Client:
-   394 await Client.PublishAsync(new[]
362 { 395 {
-   396 new MqttApplicationMessage
-   397 {
-   398 Topic = topic,
-   399 Payload = data,
-   400 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
-   401 }
-   402 });
-   403 break;
-   404 case MqttCommunicationType.Server:
363 new MqttApplicationMessage 405 await Server.PublishAsync(new[]
-   406 {
-   407 new MqttApplicationMessage
364 { 408 {
365 Topic = topic, 409 Topic = topic,
366 Payload = payloadStream.ToArray(), 410 Payload = data,
-   411 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
367 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce 412 }
-   413 });
368 } 414 break;
369 }); 415 }
370 break; 416 }
371 } 417 }
372 } 418 }
373 } 419 }
374 } 420 }