/trunk/WingMan/Communication/MqttCommunication.cs |
@@ -1,8 +1,10 @@ |
using System; |
using System.IO; |
using System.IO.Compression; |
using System.Net; |
using System.Threading; |
using System.Threading.Tasks; |
using LZ4; |
using MQTTnet; |
using MQTTnet.Client; |
using MQTTnet.Extensions.ManagedClient; |
@@ -182,8 +184,24 @@ |
{ |
try |
{ |
e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); |
using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload)) |
{ |
using (var decryptedStream = await AES.Decrypt(inputStream, Password)) |
{ |
using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress)) |
{ |
using (var outpuStream = new MemoryStream()) |
{ |
await lz4Decompress.CopyToAsync(outpuStream); |
|
outpuStream.Position = 0L; |
|
e.ApplicationMessage.Payload = outpuStream.ToArray(); |
} |
} |
} |
} |
|
await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
} |
@@ -268,8 +286,24 @@ |
{ |
try |
{ |
e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); |
using (var inputStream = new MemoryStream(e.ApplicationMessage.Payload)) |
{ |
using (var decryptedStream = await AES.Decrypt(inputStream, Password)) |
{ |
using (var lz4Decompress = new LZ4Stream(decryptedStream, CompressionMode.Decompress)) |
{ |
using (var outpuStream = new MemoryStream()) |
{ |
await lz4Decompress.CopyToAsync(outpuStream); |
|
outpuStream.Position = 0L; |
|
e.ApplicationMessage.Payload = outpuStream.ToArray(); |
} |
} |
} |
} |
|
await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
} |
@@ -342,32 +376,44 @@ |
|
public async Task Broadcast(string topic, byte[] payload) |
{ |
using (var payloadStream = new MemoryStream(await AES.Encrypt(payload, Password))) |
using (var compressStream = new MemoryStream()) |
{ |
switch (Type) |
using (var lz4Stream = new LZ4Stream(compressStream, CompressionMode.Compress)) |
{ |
case MqttCommunicationType.Client: |
await Client.PublishAsync(new[] |
await lz4Stream.WriteAsync(payload, 0, payload.Length); |
await lz4Stream.FlushAsync(); |
|
compressStream.Position = 0L; |
|
using (var outputStream = await AES.Encrypt(compressStream, Password)) |
{ |
var data = outputStream.ToArray(); |
switch (Type) |
{ |
new MqttApplicationMessage |
{ |
Topic = topic, |
Payload = payloadStream.ToArray(), |
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce |
} |
}); |
break; |
case MqttCommunicationType.Server: |
await Server.PublishAsync(new[] |
{ |
new MqttApplicationMessage |
{ |
Topic = topic, |
Payload = payloadStream.ToArray(), |
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce |
} |
}); |
break; |
case MqttCommunicationType.Client: |
await Client.PublishAsync(new[] |
{ |
new MqttApplicationMessage |
{ |
Topic = topic, |
Payload = data, |
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce |
} |
}); |
break; |
case MqttCommunicationType.Server: |
await Server.PublishAsync(new[] |
{ |
new MqttApplicationMessage |
{ |
Topic = topic, |
Payload = data, |
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce |
} |
}); |
break; |
} |
} |
} |
} |
} |