WingMan – Blame information for rev 12
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
10 | office | 1 | using System; |
12 | office | 2 | using System.IO; |
10 | office | 3 | using System.Net; |
4 | using System.Text; |
||
5 | using System.Threading; |
||
6 | using System.Threading.Tasks; |
||
7 | using MQTTnet; |
||
8 | using MQTTnet.Client; |
||
9 | using MQTTnet.Extensions.ManagedClient; |
||
10 | using MQTTnet.Protocol; |
||
11 | using MQTTnet.Server; |
||
12 | using WingMan.Utilities; |
||
13 | using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs; |
||
14 | using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs; |
||
15 | |||
16 | namespace WingMan.Communication |
||
17 | { |
||
18 | public class MqttCommunication : IDisposable |
||
19 | { |
||
20 | public delegate void ClientAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e); |
||
21 | |||
22 | public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e); |
||
23 | |||
24 | public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e); |
||
25 | |||
26 | public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e); |
||
27 | |||
28 | public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e); |
||
29 | |||
30 | public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e); |
||
31 | |||
32 | public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e); |
||
33 | |||
34 | public delegate void ServerAuthenticationFailed(object sender, MqttAuthenticationFailureEventArgs e); |
||
35 | |||
36 | public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e); |
||
37 | |||
38 | public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e); |
||
39 | |||
40 | public delegate void ServerStarted(object sender, EventArgs e); |
||
41 | |||
42 | public delegate void ServerStopped(object sender, EventArgs e); |
||
43 | |||
44 | public MqttCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken) |
||
45 | { |
||
46 | TaskScheduler = taskScheduler; |
||
47 | CancellationToken = cancellationToken; |
||
48 | |||
49 | Client = new MqttFactory().CreateManagedMqttClient(); |
||
50 | Server = new MqttFactory().CreateMqttServer(); |
||
51 | } |
||
52 | |||
53 | private TaskScheduler TaskScheduler { get; } |
||
54 | |||
55 | private IManagedMqttClient Client { get; } |
||
56 | |||
57 | private IMqttServer Server { get; } |
||
58 | |||
59 | public bool Running { get; set; } |
||
60 | |||
61 | public string Nick { get; set; } |
||
62 | |||
63 | private IPAddress IpAddress { get; set; } |
||
64 | |||
65 | private int Port { get; set; } |
||
66 | |||
67 | private string Password { get; set; } |
||
68 | |||
69 | private CancellationToken CancellationToken { get; } |
||
70 | |||
71 | public MqttCommunicationType Type { get; set; } |
||
72 | |||
73 | public async void Dispose() |
||
74 | { |
||
75 | await Stop(); |
||
76 | } |
||
77 | |||
78 | public event ClientAuthenticationFailed OnClientAuthenticationFailed; |
||
79 | |||
80 | public event ServerAuthenticationFailed OnServerAuthenticationFailed; |
||
81 | |||
82 | public event MessageReceived OnMessageReceived; |
||
83 | |||
84 | public event ClientConnected OnClientConnected; |
||
85 | |||
86 | public event ClientDisconnected OnClientDisconnected; |
||
87 | |||
88 | public event ClientConnectionFailed OnClientConnectionFailed; |
||
89 | |||
90 | public event ClientUnsubscribed OnClientUnsubscribed; |
||
91 | |||
92 | public event ClientSubscribed OnClientSubscribed; |
||
93 | |||
94 | public event ServerClientDisconnected OnServerClientDisconnected; |
||
95 | |||
96 | public event ServerClientConnected OnServerClientConnected; |
||
97 | |||
98 | public event ServerStarted OnServerStarted; |
||
99 | |||
100 | public event ServerStopped OnServerStopped; |
||
101 | |||
102 | public async Task<bool> Start(MqttCommunicationType type, IPAddress ipAddress, int port, string nick, |
||
103 | string password) |
||
104 | { |
||
105 | Type = type; |
||
106 | IpAddress = ipAddress; |
||
107 | Port = port; |
||
108 | Nick = nick; |
||
109 | Password = password; |
||
110 | |||
111 | switch (type) |
||
112 | { |
||
113 | case MqttCommunicationType.Client: |
||
114 | return await StartClient(); |
||
115 | case MqttCommunicationType.Server: |
||
116 | return await StartServer(); |
||
117 | } |
||
118 | |||
119 | return false; |
||
120 | } |
||
121 | |||
122 | private async Task<bool> StartClient() |
||
123 | { |
||
124 | var clientOptions = new MqttClientOptionsBuilder() |
||
125 | .WithTcpServer(IpAddress.ToString(), Port); |
||
126 | |||
127 | // Setup and start a managed MQTT client. |
||
128 | var options = new ManagedMqttClientOptionsBuilder() |
||
129 | .WithClientOptions(clientOptions.Build()) |
||
130 | .Build(); |
||
131 | |||
132 | BindClientHandlers(); |
||
133 | |||
134 | await Client.SubscribeAsync( |
||
135 | new TopicFilterBuilder() |
||
136 | .WithTopic("lobby") |
||
137 | .Build() |
||
138 | ); |
||
139 | |||
140 | await Client.SubscribeAsync( |
||
141 | new TopicFilterBuilder() |
||
142 | .WithTopic("exchange") |
||
143 | .Build() |
||
144 | ); |
||
145 | |||
146 | await Client.SubscribeAsync( |
||
147 | new TopicFilterBuilder() |
||
148 | .WithTopic("execute") |
||
149 | .Build() |
||
150 | ); |
||
151 | |||
152 | await Client.StartAsync(options); |
||
153 | |||
154 | Running = true; |
||
155 | |||
156 | return Running; |
||
157 | } |
||
158 | |||
159 | private async Task StopClient() |
||
160 | { |
||
161 | UnbindClientHandlers(); |
||
162 | |||
163 | await Client.StopAsync(); |
||
164 | } |
||
165 | |||
166 | public void BindClientHandlers() |
||
167 | { |
||
168 | Client.Connected += ClientOnConnected; |
||
169 | Client.Disconnected += ClientOnDisconnected; |
||
170 | Client.ConnectingFailed += ClientOnConnectingFailed; |
||
171 | Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived; |
||
172 | } |
||
173 | |||
174 | public void UnbindClientHandlers() |
||
175 | { |
||
176 | Client.Connected -= ClientOnConnected; |
||
177 | Client.Disconnected -= ClientOnDisconnected; |
||
178 | Client.ConnectingFailed -= ClientOnConnectingFailed; |
||
179 | Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived; |
||
180 | } |
||
181 | |||
182 | private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) |
||
183 | { |
||
184 | try |
||
185 | { |
||
12 | office | 186 | e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); |
10 | office | 187 | |
188 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
||
189 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
190 | } |
||
191 | catch (Exception ex) |
||
192 | { |
||
193 | await Task.Delay(0, CancellationToken).ContinueWith( |
||
194 | _ => OnClientAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)), |
||
195 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
196 | } |
||
197 | } |
||
198 | |||
199 | private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e) |
||
200 | { |
||
201 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e), |
||
202 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
203 | } |
||
204 | |||
205 | private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e) |
||
206 | { |
||
207 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e), |
||
208 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
209 | } |
||
210 | |||
211 | private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e) |
||
212 | { |
||
213 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientConnected?.Invoke(sender, e), |
||
214 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
215 | } |
||
216 | |||
217 | private async Task<bool> StartServer() |
||
218 | { |
||
219 | var optionsBuilder = new MqttServerOptionsBuilder() |
||
220 | .WithDefaultEndpointBoundIPAddress(IpAddress) |
||
221 | .WithDefaultEndpointPort(Port); |
||
222 | |||
223 | BindServerHandlers(); |
||
224 | |||
225 | try |
||
226 | { |
||
227 | await Server.StartAsync(optionsBuilder.Build()); |
||
228 | |||
229 | Running = true; |
||
230 | } |
||
231 | catch (Exception) |
||
232 | { |
||
233 | Running = false; |
||
234 | } |
||
235 | |||
236 | return Running; |
||
237 | } |
||
238 | |||
239 | private async Task StopServer() |
||
240 | { |
||
241 | UnbindServerHandlers(); |
||
242 | |||
243 | await Server.StopAsync(); |
||
244 | } |
||
245 | |||
246 | private void BindServerHandlers() |
||
247 | { |
||
248 | Server.Started += ServerOnStarted; |
||
249 | Server.Stopped += ServerOnStopped; |
||
250 | Server.ClientConnected += ServerOnClientConnected; |
||
251 | Server.ClientDisconnected += ServerOnClientDisconnected; |
||
252 | Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic; |
||
253 | Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic; |
||
254 | Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived; |
||
255 | } |
||
256 | |||
257 | private void UnbindServerHandlers() |
||
258 | { |
||
259 | Server.Started -= ServerOnStarted; |
||
260 | Server.Stopped -= ServerOnStopped; |
||
261 | Server.ClientConnected -= ServerOnClientConnected; |
||
262 | Server.ClientDisconnected -= ServerOnClientDisconnected; |
||
263 | Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic; |
||
264 | Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic; |
||
265 | Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived; |
||
266 | } |
||
267 | |||
268 | private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) |
||
269 | { |
||
270 | try |
||
271 | { |
||
12 | office | 272 | e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); |
10 | office | 273 | |
274 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
||
275 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
276 | } |
||
277 | catch (Exception ex) |
||
278 | { |
||
279 | foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync()) |
||
280 | { |
||
281 | if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal)) |
||
282 | continue; |
||
283 | |||
284 | await clientSessionStatus.DisconnectAsync(); |
||
285 | } |
||
286 | |||
287 | await Task.Delay(0, CancellationToken).ContinueWith( |
||
288 | _ => OnServerAuthenticationFailed?.Invoke(sender, new MqttAuthenticationFailureEventArgs(e, ex)), |
||
289 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
290 | } |
||
291 | } |
||
292 | |||
293 | private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e) |
||
294 | { |
||
295 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e), |
||
296 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
297 | } |
||
298 | |||
299 | private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e) |
||
300 | { |
||
301 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e), |
||
302 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
303 | } |
||
304 | |||
305 | private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e) |
||
306 | { |
||
307 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e), |
||
308 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
309 | } |
||
310 | |||
311 | private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e) |
||
312 | { |
||
313 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e), |
||
314 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
315 | } |
||
316 | |||
12 | office | 317 | private async void ServerOnStopped(object sender, EventArgs e) |
10 | office | 318 | { |
12 | office | 319 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStopped?.Invoke(sender, e), |
320 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
10 | office | 321 | } |
322 | |||
12 | office | 323 | private async void ServerOnStarted(object sender, EventArgs e) |
10 | office | 324 | { |
12 | office | 325 | await Task.Delay(0, CancellationToken).ContinueWith(_ => OnServerStarted?.Invoke(sender, e), |
326 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
10 | office | 327 | } |
328 | |||
329 | public async Task Stop() |
||
330 | { |
||
331 | switch (Type) |
||
332 | { |
||
333 | case MqttCommunicationType.Server: |
||
334 | await StopServer(); |
||
335 | break; |
||
336 | case MqttCommunicationType.Client: |
||
337 | await StopClient(); |
||
338 | break; |
||
339 | } |
||
340 | |||
341 | Running = false; |
||
342 | } |
||
343 | |||
344 | public async Task Broadcast(string topic, byte[] payload) |
||
345 | { |
||
12 | office | 346 | using (var payloadStream = new MemoryStream(await AES.Encrypt(payload, Password))) |
10 | office | 347 | { |
12 | office | 348 | switch (Type) |
349 | { |
||
350 | case MqttCommunicationType.Client: |
||
351 | await Client.PublishAsync(new [] |
||
352 | { |
||
353 | new MqttApplicationMessage |
||
354 | { |
||
355 | Topic = topic, |
||
356 | Payload = payloadStream.ToArray(), |
||
357 | QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce |
||
358 | } |
||
359 | }); |
||
360 | break; |
||
361 | case MqttCommunicationType.Server: |
||
362 | await Server.PublishAsync(new[] |
||
363 | { |
||
364 | new MqttApplicationMessage |
||
365 | { |
||
366 | Topic = topic, |
||
367 | Payload = payloadStream.ToArray(), |
||
368 | QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce |
||
369 | } |
||
370 | }); |
||
371 | break; |
||
372 | } |
||
10 | office | 373 | } |
374 | } |
||
375 | } |
||
376 | } |