/trunk/WingMan/Communication/MqttCommunication.cs |
@@ -1,4 +1,5 @@ |
using System; |
using System.IO; |
using System.Net; |
using System.Text; |
using System.Threading; |
@@ -182,12 +183,8 @@ |
{ |
try |
{ |
var load = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); |
e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); |
|
e.ApplicationMessage.Payload = AES.Decrypt(e.ApplicationMessage.Payload, Password); |
|
var load2 = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); |
|
await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
} |
@@ -221,8 +218,6 @@ |
{ |
var optionsBuilder = new MqttServerOptionsBuilder() |
.WithDefaultEndpointBoundIPAddress(IpAddress) |
.WithSubscriptionInterceptor(MqttSubscriptionIntercept) |
.WithConnectionValidator(MqttConnectionValidator) |
.WithDefaultEndpointPort(Port); |
|
BindServerHandlers(); |
@@ -241,11 +236,6 @@ |
return Running; |
} |
|
private void MqttConnectionValidator(MqttConnectionValidatorContext context) |
{ |
context.ReturnCode = MqttConnectReturnCode.ConnectionAccepted; |
} |
|
private async Task StopServer() |
{ |
UnbindServerHandlers(); |
@@ -253,21 +243,6 @@ |
await Server.StopAsync(); |
} |
|
private void MqttSubscriptionIntercept(MqttSubscriptionInterceptorContext context) |
{ |
if (context.TopicFilter.Topic != "lobby" && |
context.TopicFilter.Topic != "exchange" && |
context.TopicFilter.Topic != "execute") |
{ |
context.AcceptSubscription = false; |
context.CloseConnection = true; |
return; |
} |
|
context.AcceptSubscription = true; |
context.CloseConnection = false; |
} |
|
private void BindServerHandlers() |
{ |
Server.Started += ServerOnStarted; |
@@ -294,12 +269,8 @@ |
{ |
try |
{ |
var load = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); |
e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); |
|
e.ApplicationMessage.Payload = AES.Decrypt(e.ApplicationMessage.Payload, Password); |
|
var load2 = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); |
|
await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
} |
@@ -343,14 +314,16 @@ |
CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
} |
|
private void ServerOnStopped(object sender, EventArgs e) |
private async void ServerOnStopped(object sender, EventArgs e) |
{ |
OnServerStopped?.Invoke(sender, e); |
await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStopped?.Invoke(sender, e), |
CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
} |
|
private void ServerOnStarted(object sender, EventArgs e) |
private async void ServerOnStarted(object sender, EventArgs e) |
{ |
OnServerStarted?.Invoke(sender, e); |
await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStarted?.Invoke(sender, e), |
CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
} |
|
public async Task Stop() |
@@ -370,21 +343,33 @@ |
|
public async Task Broadcast(string topic, byte[] payload) |
{ |
var encryptedPayload = AES.Encrypt(payload, Password); |
|
var load = Encoding.UTF8.GetString(encryptedPayload); |
|
switch (Type) |
using (var payloadStream = new MemoryStream(await AES.Encrypt(payload, Password))) |
{ |
case MqttCommunicationType.Client: |
await Client.PublishAsync(new ManagedMqttApplicationMessage |
{ |
ApplicationMessage = new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload } |
}); |
break; |
case MqttCommunicationType.Server: |
await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload }); |
break; |
switch (Type) |
{ |
case MqttCommunicationType.Client: |
await Client.PublishAsync(new [] |
{ |
new MqttApplicationMessage |
{ |
Topic = topic, |
Payload = payloadStream.ToArray(), |
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce |
} |
}); |
break; |
case MqttCommunicationType.Server: |
await Server.PublishAsync(new[] |
{ |
new MqttApplicationMessage |
{ |
Topic = topic, |
Payload = payloadStream.ToArray(), |
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce |
} |
}); |
break; |
} |
} |
} |
} |