WingMan – Blame information for rev 10

Subversion Repositories:
Rev:
Rev Author Line No. Line
10 office 1 using System;
2 using System.Net;
3 using System.Text;
4 using System.Threading;
5 using System.Threading.Tasks;
6 using MQTTnet;
7 using MQTTnet.Client;
8 using MQTTnet.Extensions.ManagedClient;
9 using MQTTnet.Protocol;
10 using MQTTnet.Server;
11 using WingMan.Utilities;
12 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
13 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
14  
15 namespace WingMan.Communication
16 {
17 public class MqttCommunication : IDisposable
18 {
19 public delegate void ClientAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
20  
21 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
22  
23 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
24  
25 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
26  
27 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
28  
29 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
30  
31 public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);
32  
33 public delegate void ServerAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
34  
35 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
36  
37 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
38  
39 public delegate void ServerStarted(object sender, EventArgs e);
40  
41 public delegate void ServerStopped(object sender, EventArgs e);
42  
43 public MqttCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken)
44 {
45 TaskScheduler = taskScheduler;
46 CancellationToken = cancellationToken;
47  
48 Client = new MqttFactory().CreateManagedMqttClient();
49 Server = new MqttFactory().CreateMqttServer();
50 }
51  
52 private TaskScheduler TaskScheduler { get; }
53  
54 private IManagedMqttClient Client { get; }
55  
56 private IMqttServer Server { get; }
57  
58 public bool Running { get; set; }
59  
60 public string Nick { get; set; }
61  
62 private IPAddress IpAddress { get; set; }
63  
64 private int Port { get; set; }
65  
66 private string Password { get; set; }
67  
68 private CancellationToken CancellationToken { get; }
69  
70 public MqttCommunicationType Type { get; set; }
71  
72 public async void Dispose()
73 {
74 await Stop();
75 }
76  
77 public event ClientAuthenticationFailed OnClientAuthenticationFailed;
78  
79 public event ServerAuthenticationFailed OnServerAuthenticationFailed;
80  
81 public event MessageReceived OnMessageReceived;
82  
83 public event ClientConnected OnClientConnected;
84  
85 public event ClientDisconnected OnClientDisconnected;
86  
87 public event ClientConnectionFailed OnClientConnectionFailed;
88  
89 public event ClientUnsubscribed OnClientUnsubscribed;
90  
91 public event ClientSubscribed OnClientSubscribed;
92  
93 public event ServerClientDisconnected OnServerClientDisconnected;
94  
95 public event ServerClientConnected OnServerClientConnected;
96  
97 public event ServerStarted OnServerStarted;
98  
99 public event ServerStopped OnServerStopped;
100  
101 public async Task<bool> Start(MqttCommunicationType type, IPAddress ipAddress, int port, string nick,
102 string password)
103 {
104 Type = type;
105 IpAddress = ipAddress;
106 Port = port;
107 Nick = nick;
108 Password = password;
109  
110 switch (type)
111 {
112 case MqttCommunicationType.Client:
113 return await StartClient();
114 case MqttCommunicationType.Server:
115 return await StartServer();
116 }
117  
118 return false;
119 }
120  
121 private async Task<bool> StartClient()
122 {
123 var clientOptions = new MqttClientOptionsBuilder()
124 .WithTcpServer(IpAddress.ToString(), Port);
125  
126 // Setup and start a managed MQTT client.
127 var options = new ManagedMqttClientOptionsBuilder()
128 .WithClientOptions(clientOptions.Build())
129 .Build();
130  
131 BindClientHandlers();
132  
133 await Client.SubscribeAsync(
134 new TopicFilterBuilder()
135 .WithTopic("lobby")
136 .Build()
137 );
138  
139 await Client.SubscribeAsync(
140 new TopicFilterBuilder()
141 .WithTopic("exchange")
142 .Build()
143 );
144  
145 await Client.SubscribeAsync(
146 new TopicFilterBuilder()
147 .WithTopic("execute")
148 .Build()
149 );
150  
151 await Client.StartAsync(options);
152  
153 Running = true;
154  
155 return Running;
156 }
157  
158 private async Task StopClient()
159 {
160 UnbindClientHandlers();
161  
162 await Client.StopAsync();
163 }
164  
165 public void BindClientHandlers()
166 {
167 Client.Connected += ClientOnConnected;
168 Client.Disconnected += ClientOnDisconnected;
169 Client.ConnectingFailed += ClientOnConnectingFailed;
170 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
171 }
172  
173 public void UnbindClientHandlers()
174 {
175 Client.Connected -= ClientOnConnected;
176 Client.Disconnected -= ClientOnDisconnected;
177 Client.ConnectingFailed -= ClientOnConnectingFailed;
178 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
179 }
180  
181 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
182 {
183 try
184 {
185 var load = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
186  
187 e.ApplicationMessage.Payload = AES.Decrypt(e.ApplicationMessage.Payload, Password);
188  
189 var load2 = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
190  
191 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
192 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
193 }
194 catch (Exception ex)
195 {
196 await Task.Delay(0, CancellationToken).ContinueWith(
197 _ => OnClientAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
198 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
199 }
200 }
201  
202 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
203 {
204 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
205 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
206 }
207  
208 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
209 {
210 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
211 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
212 }
213  
214 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
215 {
216 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
217 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
218 }
219  
220 private async Task<bool> StartServer()
221 {
222 var optionsBuilder = new MqttServerOptionsBuilder()
223 .WithDefaultEndpointBoundIPAddress(IpAddress)
224 .WithSubscriptionInterceptor(MqttSubscriptionIntercept)
225 .WithConnectionValidator(MqttConnectionValidator)
226 .WithDefaultEndpointPort(Port);
227  
228 BindServerHandlers();
229  
230 try
231 {
232 await Server.StartAsync(optionsBuilder.Build());
233  
234 Running = true;
235 }
236 catch (Exception)
237 {
238 Running = false;
239 }
240  
241 return Running;
242 }
243  
244 private void MqttConnectionValidator(MqttConnectionValidatorContext context)
245 {
246 context.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
247 }
248  
249 private async Task StopServer()
250 {
251 UnbindServerHandlers();
252  
253 await Server.StopAsync();
254 }
255  
256 private void MqttSubscriptionIntercept(MqttSubscriptionInterceptorContext context)
257 {
258 if (context.TopicFilter.Topic != "lobby" &&
259 context.TopicFilter.Topic != "exchange" &&
260 context.TopicFilter.Topic != "execute")
261 {
262 context.AcceptSubscription = false;
263 context.CloseConnection = true;
264 return;
265 }
266  
267 context.AcceptSubscription = true;
268 context.CloseConnection = false;
269 }
270  
271 private void BindServerHandlers()
272 {
273 Server.Started += ServerOnStarted;
274 Server.Stopped += ServerOnStopped;
275 Server.ClientConnected += ServerOnClientConnected;
276 Server.ClientDisconnected += ServerOnClientDisconnected;
277 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
278 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
279 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
280 }
281  
282 private void UnbindServerHandlers()
283 {
284 Server.Started -= ServerOnStarted;
285 Server.Stopped -= ServerOnStopped;
286 Server.ClientConnected -= ServerOnClientConnected;
287 Server.ClientDisconnected -= ServerOnClientDisconnected;
288 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
289 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
290 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
291 }
292  
293 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
294 {
295 try
296 {
297 var load = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
298  
299 e.ApplicationMessage.Payload = AES.Decrypt(e.ApplicationMessage.Payload, Password);
300  
301 var load2 = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
302  
303 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
304 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
305 }
306 catch (Exception ex)
307 {
308 foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync())
309 {
310 if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal))
311 continue;
312  
313 await clientSessionStatus.DisconnectAsync();
314 }
315  
316 await Task.Delay(0, CancellationToken).ContinueWith(
317 _ => OnServerAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
318 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
319 }
320 }
321  
322 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
323 {
324 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
325 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
326 }
327  
328 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
329 {
330 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
331 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
332 }
333  
334 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
335 {
336 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
337 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
338 }
339  
340 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
341 {
342 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
343 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
344 }
345  
346 private void ServerOnStopped(object sender, EventArgs e)
347 {
348 OnServerStopped?.Invoke(sender, e);
349 }
350  
351 private void ServerOnStarted(object sender, EventArgs e)
352 {
353 OnServerStarted?.Invoke(sender, e);
354 }
355  
356 public async Task Stop()
357 {
358 switch (Type)
359 {
360 case MqttCommunicationType.Server:
361 await StopServer();
362 break;
363 case MqttCommunicationType.Client:
364 await StopClient();
365 break;
366 }
367  
368 Running = false;
369 }
370  
371 public async Task Broadcast(string topic, byte[] payload)
372 {
373 var encryptedPayload = AES.Encrypt(payload, Password);
374  
375 var load = Encoding.UTF8.GetString(encryptedPayload);
376  
377 switch (Type)
378 {
379 case MqttCommunicationType.Client:
380 await Client.PublishAsync(new ManagedMqttApplicationMessage
381 {
382 ApplicationMessage = new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload }
383 });
384 break;
385 case MqttCommunicationType.Server:
386 await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload });
387 break;
388 }
389 }
390 }
391 }