WingMan – Blame information for rev 14

Subversion Repositories:
Rev:
Rev Author Line No. Line
10 office 1 using System;
12 office 2 using System.IO;
10 office 3 using System.Net;
4 using System.Threading;
5 using System.Threading.Tasks;
6 using MQTTnet;
7 using MQTTnet.Client;
8 using MQTTnet.Extensions.ManagedClient;
9 using MQTTnet.Protocol;
10 using MQTTnet.Server;
11 using WingMan.Utilities;
12 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
13 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
14  
15 namespace WingMan.Communication
16 {
17 public class MqttCommunication : IDisposable
18 {
19 public delegate void ClientAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
20  
21 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
22  
23 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
24  
25 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
26  
27 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
28  
29 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
30  
31 public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);
32  
33 public delegate void ServerAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
34  
35 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
36  
37 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
38  
39 public delegate void ServerStarted(object sender, EventArgs e);
40  
41 public delegate void ServerStopped(object sender, EventArgs e);
42  
43 public MqttCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken)
44 {
45 TaskScheduler = taskScheduler;
46 CancellationToken = cancellationToken;
47  
48 Client = new MqttFactory().CreateManagedMqttClient();
49 Server = new MqttFactory().CreateMqttServer();
50 }
51  
52 private TaskScheduler TaskScheduler { get; }
53  
54 private IManagedMqttClient Client { get; }
55  
56 private IMqttServer Server { get; }
57  
58 public bool Running { get; set; }
59  
60 public string Nick { get; set; }
61  
62 private IPAddress IpAddress { get; set; }
63  
64 private int Port { get; set; }
65  
66 private string Password { get; set; }
67  
68 private CancellationToken CancellationToken { get; }
69  
70 public MqttCommunicationType Type { get; set; }
71  
72 public async void Dispose()
73 {
74 await Stop();
75 }
76  
77 public event ClientAuthenticationFailed OnClientAuthenticationFailed;
78  
79 public event ServerAuthenticationFailed OnServerAuthenticationFailed;
80  
81 public event MessageReceived OnMessageReceived;
82  
83 public event ClientConnected OnClientConnected;
84  
85 public event ClientDisconnected OnClientDisconnected;
86  
87 public event ClientConnectionFailed OnClientConnectionFailed;
88  
89 public event ClientUnsubscribed OnClientUnsubscribed;
90  
91 public event ClientSubscribed OnClientSubscribed;
92  
93 public event ServerClientDisconnected OnServerClientDisconnected;
94  
95 public event ServerClientConnected OnServerClientConnected;
96  
97 public event ServerStarted OnServerStarted;
98  
99 public event ServerStopped OnServerStopped;
100  
101 public async Task<bool> Start(MqttCommunicationType type, IPAddress ipAddress, int port, string nick,
102 string password)
103 {
104 Type = type;
105 IpAddress = ipAddress;
106 Port = port;
107 Nick = nick;
108 Password = password;
109  
110 switch (type)
111 {
112 case MqttCommunicationType.Client:
113 return await StartClient();
114 case MqttCommunicationType.Server:
115 return await StartServer();
116 }
117  
118 return false;
119 }
120  
121 private async Task<bool> StartClient()
122 {
123 var clientOptions = new MqttClientOptionsBuilder()
124 .WithTcpServer(IpAddress.ToString(), Port);
125  
126 // Setup and start a managed MQTT client.
127 var options = new ManagedMqttClientOptionsBuilder()
128 .WithClientOptions(clientOptions.Build())
129 .Build();
130  
131 BindClientHandlers();
132  
133 await Client.SubscribeAsync(
134 new TopicFilterBuilder()
135 .WithTopic("lobby")
136 .Build()
137 );
138  
139 await Client.SubscribeAsync(
140 new TopicFilterBuilder()
141 .WithTopic("exchange")
142 .Build()
143 );
144  
145 await Client.SubscribeAsync(
146 new TopicFilterBuilder()
147 .WithTopic("execute")
148 .Build()
149 );
150  
151 await Client.StartAsync(options);
152  
153 Running = true;
154  
155 return Running;
156 }
157  
158 private async Task StopClient()
159 {
160 UnbindClientHandlers();
161  
162 await Client.StopAsync();
163 }
164  
165 public void BindClientHandlers()
166 {
167 Client.Connected += ClientOnConnected;
168 Client.Disconnected += ClientOnDisconnected;
169 Client.ConnectingFailed += ClientOnConnectingFailed;
170 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
171 }
172  
173 public void UnbindClientHandlers()
174 {
175 Client.Connected -= ClientOnConnected;
176 Client.Disconnected -= ClientOnDisconnected;
177 Client.ConnectingFailed -= ClientOnConnectingFailed;
178 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
179 }
180  
181 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
182 {
183 try
184 {
12 office 185 e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
10 office 186  
187 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
188 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
189 }
190 catch (Exception ex)
191 {
192 await Task.Delay(0, CancellationToken).ContinueWith(
193 _ => OnClientAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
194 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
195 }
196 }
197  
198 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
199 {
200 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
201 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
202 }
203  
204 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
205 {
206 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
207 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
208 }
209  
210 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
211 {
212 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
213 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
214 }
215  
216 private async Task<bool> StartServer()
217 {
218 var optionsBuilder = new MqttServerOptionsBuilder()
219 .WithDefaultEndpointBoundIPAddress(IpAddress)
220 .WithDefaultEndpointPort(Port);
221  
222 BindServerHandlers();
223  
224 try
225 {
226 await Server.StartAsync(optionsBuilder.Build());
227  
228 Running = true;
229 }
230 catch (Exception)
231 {
232 Running = false;
233 }
234  
235 return Running;
236 }
237  
238 private async Task StopServer()
239 {
240 UnbindServerHandlers();
241  
242 await Server.StopAsync();
243 }
244  
245 private void BindServerHandlers()
246 {
247 Server.Started += ServerOnStarted;
248 Server.Stopped += ServerOnStopped;
249 Server.ClientConnected += ServerOnClientConnected;
250 Server.ClientDisconnected += ServerOnClientDisconnected;
251 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
252 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
253 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
254 }
255  
256 private void UnbindServerHandlers()
257 {
258 Server.Started -= ServerOnStarted;
259 Server.Stopped -= ServerOnStopped;
260 Server.ClientConnected -= ServerOnClientConnected;
261 Server.ClientDisconnected -= ServerOnClientDisconnected;
262 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
263 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
264 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
265 }
266  
267 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
268 {
269 try
270 {
12 office 271 e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
10 office 272  
273 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
274 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
275 }
276 catch (Exception ex)
277 {
278 foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync())
279 {
280 if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal))
281 continue;
282  
283 await clientSessionStatus.DisconnectAsync();
284 }
285  
286 await Task.Delay(0, CancellationToken).ContinueWith(
287 _ => OnServerAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
288 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
289 }
290 }
291  
292 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
293 {
294 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
295 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
296 }
297  
298 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
299 {
300 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
301 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
302 }
303  
304 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
305 {
306 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
307 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
308 }
309  
310 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
311 {
312 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
313 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
314 }
315  
12 office 316 private async void ServerOnStopped(object sender, EventArgs e)
10 office 317 {
12 office 318 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStopped?.Invoke(sender, e),
319 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
10 office 320 }
321  
12 office 322 private async void ServerOnStarted(object sender, EventArgs e)
10 office 323 {
12 office 324 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStarted?.Invoke(sender, e),
325 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
10 office 326 }
327  
328 public async Task Stop()
329 {
330 switch (Type)
331 {
332 case MqttCommunicationType.Server:
333 await StopServer();
334 break;
335 case MqttCommunicationType.Client:
336 await StopClient();
337 break;
338 }
339  
340 Running = false;
341 }
342  
343 public async Task Broadcast(string topic, byte[] payload)
344 {
12 office 345 using (var payloadStream = new MemoryStream(await AES.Encrypt(payload, Password)))
10 office 346 {
12 office 347 switch (Type)
348 {
349 case MqttCommunicationType.Client:
14 office 350 await Client.PublishAsync(new[]
12 office 351 {
352 new MqttApplicationMessage
353 {
354 Topic = topic,
355 Payload = payloadStream.ToArray(),
356 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
357 }
358 });
359 break;
360 case MqttCommunicationType.Server:
361 await Server.PublishAsync(new[]
362 {
363 new MqttApplicationMessage
364 {
365 Topic = topic,
366 Payload = payloadStream.ToArray(),
367 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
368 }
369 });
370 break;
371 }
10 office 372 }
373 }
374 }
375 }