WingMan

Subversion Repositories:
Compare Path: Rev
With Path: Rev
?path1? @ 33  →  ?path2? @ 34
/trunk/WingMan/Communication/MqttCommunication.cs
@@ -1,8 +1,10 @@
using System;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using LZ4;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
@@ -182,8 +184,24 @@
{
try
{
e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload))
{
using (var decryptedStream = await AES.Decrypt(inputStream, Password))
{
using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress))
{
using (var outpuStream = new MemoryStream())
{
await lz4Decompress.CopyToAsync(outpuStream);
 
outpuStream.Position = 0L;
 
e.ApplicationMessage.Payload = outpuStream.ToArray();
}
}
}
}
 
await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler);
}
@@ -268,8 +286,24 @@
{
try
{
e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload))
{
using (var decryptedStream = await AES.Decrypt(inputStream, Password))
{
using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress))
{
using (var outpuStream = new MemoryStream())
{
await lz4Decompress.CopyToAsync(outpuStream);
 
outpuStream.Position = 0L;
 
e.ApplicationMessage.Payload = outpuStream.ToArray();
}
}
}
}
 
await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
CancellationToken, TaskContinuationOptions.None, TaskScheduler);
}
@@ -342,32 +376,44 @@
 
public async Task Broadcast(string topic, byte[] payload)
{
using (var payloadStream = new MemoryStream(await AES.Encrypt(payload, Password)))
using (var compressStream = new MemoryStream())
{
switch (Type)
using (var lz4Stream = new LZ4Stream(compressStream, CompressionMode.Compress))
{
case MqttCommunicationType.Client:
await Client.PublishAsync(new[]
await lz4Stream.WriteAsync(payload, 0, payload.Length);
await lz4Stream.FlushAsync();
 
compressStream.Position = 0L;
 
using (var outputStream = await AES.Encrypt(compressStream, Password))
{
var data = outputStream.ToArray();
switch (Type)
{
new MqttApplicationMessage
{
Topic = topic,
Payload = payloadStream.ToArray(),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
}
});
break;
case MqttCommunicationType.Server:
await Server.PublishAsync(new[]
{
new MqttApplicationMessage
{
Topic = topic,
Payload = payloadStream.ToArray(),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
}
});
break;
case MqttCommunicationType.Client:
await Client.PublishAsync(new[]
{
new MqttApplicationMessage
{
Topic = topic,
Payload = data,
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
}
});
break;
case MqttCommunicationType.Server:
await Server.PublishAsync(new[]
{
new MqttApplicationMessage
{
Topic = topic,
Payload = data,
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
}
});
break;
}
}
}
}
}