WingMan – Blame information for rev 12

Subversion Repositories:
Rev:
Rev Author Line No. Line
10 office 1 using System;
12 office 2 using System.IO;
10 office 3 using System.Net;
4 using System.Text;
5 using System.Threading;
6 using System.Threading.Tasks;
7 using MQTTnet;
8 using MQTTnet.Client;
9 using MQTTnet.Extensions.ManagedClient;
10 using MQTTnet.Protocol;
11 using MQTTnet.Server;
12 using WingMan.Utilities;
13 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
14 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
15  
16 namespace WingMan.Communication
17 {
18 public class MqttCommunication : IDisposable
19 {
20 public delegate void ClientAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
21  
22 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
23  
24 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
25  
26 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
27  
28 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
29  
30 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
31  
32 public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);
33  
34 public delegate void ServerAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e);
35  
36 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
37  
38 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
39  
40 public delegate void ServerStarted(object sender, EventArgs e);
41  
42 public delegate void ServerStopped(object sender, EventArgs e);
43  
44 public MqttCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken)
45 {
46 TaskScheduler = taskScheduler;
47 CancellationToken = cancellationToken;
48  
49 Client = new MqttFactory().CreateManagedMqttClient();
50 Server = new MqttFactory().CreateMqttServer();
51 }
52  
53 private TaskScheduler TaskScheduler { get; }
54  
55 private IManagedMqttClient Client { get; }
56  
57 private IMqttServer Server { get; }
58  
59 public bool Running { get; set; }
60  
61 public string Nick { get; set; }
62  
63 private IPAddress IpAddress { get; set; }
64  
65 private int Port { get; set; }
66  
67 private string Password { get; set; }
68  
69 private CancellationToken CancellationToken { get; }
70  
71 public MqttCommunicationType Type { get; set; }
72  
73 public async void Dispose()
74 {
75 await Stop();
76 }
77  
78 public event ClientAuthenticationFailed OnClientAuthenticationFailed;
79  
80 public event ServerAuthenticationFailed OnServerAuthenticationFailed;
81  
82 public event MessageReceived OnMessageReceived;
83  
84 public event ClientConnected OnClientConnected;
85  
86 public event ClientDisconnected OnClientDisconnected;
87  
88 public event ClientConnectionFailed OnClientConnectionFailed;
89  
90 public event ClientUnsubscribed OnClientUnsubscribed;
91  
92 public event ClientSubscribed OnClientSubscribed;
93  
94 public event ServerClientDisconnected OnServerClientDisconnected;
95  
96 public event ServerClientConnected OnServerClientConnected;
97  
98 public event ServerStarted OnServerStarted;
99  
100 public event ServerStopped OnServerStopped;
101  
102 public async Task<bool> Start(MqttCommunicationType type, IPAddress ipAddress, int port, string nick,
103 string password)
104 {
105 Type = type;
106 IpAddress = ipAddress;
107 Port = port;
108 Nick = nick;
109 Password = password;
110  
111 switch (type)
112 {
113 case MqttCommunicationType.Client:
114 return await StartClient();
115 case MqttCommunicationType.Server:
116 return await StartServer();
117 }
118  
119 return false;
120 }
121  
122 private async Task<bool> StartClient()
123 {
124 var clientOptions = new MqttClientOptionsBuilder()
125 .WithTcpServer(IpAddress.ToString(), Port);
126  
127 // Setup and start a managed MQTT client.
128 var options = new ManagedMqttClientOptionsBuilder()
129 .WithClientOptions(clientOptions.Build())
130 .Build();
131  
132 BindClientHandlers();
133  
134 await Client.SubscribeAsync(
135 new TopicFilterBuilder()
136 .WithTopic("lobby")
137 .Build()
138 );
139  
140 await Client.SubscribeAsync(
141 new TopicFilterBuilder()
142 .WithTopic("exchange")
143 .Build()
144 );
145  
146 await Client.SubscribeAsync(
147 new TopicFilterBuilder()
148 .WithTopic("execute")
149 .Build()
150 );
151  
152 await Client.StartAsync(options);
153  
154 Running = true;
155  
156 return Running;
157 }
158  
159 private async Task StopClient()
160 {
161 UnbindClientHandlers();
162  
163 await Client.StopAsync();
164 }
165  
166 public void BindClientHandlers()
167 {
168 Client.Connected += ClientOnConnected;
169 Client.Disconnected += ClientOnDisconnected;
170 Client.ConnectingFailed += ClientOnConnectingFailed;
171 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
172 }
173  
174 public void UnbindClientHandlers()
175 {
176 Client.Connected -= ClientOnConnected;
177 Client.Disconnected -= ClientOnDisconnected;
178 Client.ConnectingFailed -= ClientOnConnectingFailed;
179 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
180 }
181  
182 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
183 {
184 try
185 {
12 office 186 e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
10 office 187  
188 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
189 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
190 }
191 catch (Exception ex)
192 {
193 await Task.Delay(0, CancellationToken).ContinueWith(
194 _ => OnClientAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
195 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
196 }
197 }
198  
199 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
200 {
201 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
202 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
203 }
204  
205 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
206 {
207 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
208 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
209 }
210  
211 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
212 {
213 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
214 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
215 }
216  
217 private async Task<bool> StartServer()
218 {
219 var optionsBuilder = new MqttServerOptionsBuilder()
220 .WithDefaultEndpointBoundIPAddress(IpAddress)
221 .WithDefaultEndpointPort(Port);
222  
223 BindServerHandlers();
224  
225 try
226 {
227 await Server.StartAsync(optionsBuilder.Build());
228  
229 Running = true;
230 }
231 catch (Exception)
232 {
233 Running = false;
234 }
235  
236 return Running;
237 }
238  
239 private async Task StopServer()
240 {
241 UnbindServerHandlers();
242  
243 await Server.StopAsync();
244 }
245  
246 private void BindServerHandlers()
247 {
248 Server.Started += ServerOnStarted;
249 Server.Stopped += ServerOnStopped;
250 Server.ClientConnected += ServerOnClientConnected;
251 Server.ClientDisconnected += ServerOnClientDisconnected;
252 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
253 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
254 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
255 }
256  
257 private void UnbindServerHandlers()
258 {
259 Server.Started -= ServerOnStarted;
260 Server.Stopped -= ServerOnStopped;
261 Server.ClientConnected -= ServerOnClientConnected;
262 Server.ClientDisconnected -= ServerOnClientDisconnected;
263 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
264 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
265 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
266 }
267  
268 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
269 {
270 try
271 {
12 office 272 e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password);
10 office 273  
274 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
275 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
276 }
277 catch (Exception ex)
278 {
279 foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync())
280 {
281 if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal))
282 continue;
283  
284 await clientSessionStatus.DisconnectAsync();
285 }
286  
287 await Task.Delay(0, CancellationToken).ContinueWith(
288 _ => OnServerAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)),
289 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
290 }
291 }
292  
293 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
294 {
295 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
296 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
297 }
298  
299 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
300 {
301 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
302 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
303 }
304  
305 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
306 {
307 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
308 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
309 }
310  
311 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
312 {
313 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
314 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
315 }
316  
12 office 317 private async void ServerOnStopped(object sender, EventArgs e)
10 office 318 {
12 office 319 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStopped?.Invoke(sender, e),
320 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
10 office 321 }
322  
12 office 323 private async void ServerOnStarted(object sender, EventArgs e)
10 office 324 {
12 office 325 await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStarted?.Invoke(sender, e),
326 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
10 office 327 }
328  
329 public async Task Stop()
330 {
331 switch (Type)
332 {
333 case MqttCommunicationType.Server:
334 await StopServer();
335 break;
336 case MqttCommunicationType.Client:
337 await StopClient();
338 break;
339 }
340  
341 Running = false;
342 }
343  
344 public async Task Broadcast(string topic, byte[] payload)
345 {
12 office 346 using (var payloadStream = new MemoryStream(await AES.Encrypt(payload, Password)))
10 office 347 {
12 office 348 switch (Type)
349 {
350 case MqttCommunicationType.Client:
351 await Client.PublishAsync(new []
352 {
353 new MqttApplicationMessage
354 {
355 Topic = topic,
356 Payload = payloadStream.ToArray(),
357 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
358 }
359 });
360 break;
361 case MqttCommunicationType.Server:
362 await Server.PublishAsync(new[]
363 {
364 new MqttApplicationMessage
365 {
366 Topic = topic,
367 Payload = payloadStream.ToArray(),
368 QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
369 }
370 });
371 break;
372 }
10 office 373 }
374 }
375 }
376 }