WingMan – Blame information for rev
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
5 | office | 1 | using System; |
8 | office | 2 | using System.Collections.Generic; |
3 | using System.Linq; |
||
5 | office | 4 | using System.Net; |
6 | office | 5 | using System.Threading; |
5 | office | 6 | using System.Threading.Tasks; |
7 | using MQTTnet; |
||
8 | using MQTTnet.Client; |
||
9 | using MQTTnet.Extensions.ManagedClient; |
||
8 | office | 10 | using MQTTnet.Protocol; |
5 | office | 11 | using MQTTnet.Server; |
8 | office | 12 | using WingMan.Utilities; |
5 | office | 13 | using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs; |
14 | using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs; |
||
15 | |||
16 | namespace WingMan.Communication |
||
17 | { |
||
7 | office | 18 | public class MQTTCommunication : IDisposable |
5 | office | 19 | { |
8 | office | 20 | public delegate void ClientAuthenticationFailed(object sender, EventArgs e); |
21 | |||
7 | office | 22 | public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e); |
23 | |||
24 | public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e); |
||
25 | |||
26 | public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e); |
||
27 | |||
28 | public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e); |
||
29 | |||
30 | public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e); |
||
31 | |||
32 | public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e); |
||
33 | |||
8 | office | 34 | public delegate void ServerAuthenticationFailed(object sender, EventArgs e); |
35 | |||
7 | office | 36 | public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e); |
37 | |||
38 | public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e); |
||
39 | |||
40 | public delegate void ServerStarted(object sender, EventArgs e); |
||
41 | |||
42 | public delegate void ServerStopped(object sender, EventArgs e); |
||
43 | |||
44 | public MQTTCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken) |
||
5 | office | 45 | { |
6 | office | 46 | TaskScheduler = taskScheduler; |
7 | office | 47 | CancellationToken = cancellationToken; |
5 | office | 48 | |
8 | office | 49 | TrackedClients = new TrackedClients {Clients = new List<TrackedClient>()}; |
50 | |||
5 | office | 51 | Client = new MqttFactory().CreateManagedMqttClient(); |
52 | Server = new MqttFactory().CreateMqttServer(); |
||
53 | } |
||
54 | |||
8 | office | 55 | private TrackedClients TrackedClients { get; } |
56 | |||
7 | office | 57 | private TaskScheduler TaskScheduler { get; } |
6 | office | 58 | |
5 | office | 59 | private IManagedMqttClient Client { get; } |
60 | |||
61 | private IMqttServer Server { get; } |
||
62 | |||
63 | public bool Running { get; set; } |
||
64 | |||
65 | public string Nick { get; set; } |
||
66 | |||
67 | private IPAddress IPAddress { get; set; } |
||
68 | |||
69 | private int Port { get; set; } |
||
70 | |||
8 | office | 71 | private string Password { get; set; } |
72 | |||
7 | office | 73 | private CancellationToken CancellationToken { get; } |
6 | office | 74 | |
5 | office | 75 | public MQTTCommunicationType Type { get; set; } |
76 | |||
7 | office | 77 | public async void Dispose() |
78 | { |
||
79 | await Stop(); |
||
80 | } |
||
5 | office | 81 | |
8 | office | 82 | public event ClientAuthenticationFailed OnClientAuthenticationFailed; |
83 | |||
84 | public event ServerAuthenticationFailed OnServerAuthenticationFailed; |
||
85 | |||
5 | office | 86 | public event MessageReceived OnMessageReceived; |
87 | |||
88 | public event ClientConnected OnClientConnected; |
||
89 | |||
90 | public event ClientDisconnected OnClientDisconnected; |
||
91 | |||
92 | public event ClientConnectionFailed OnClientConnectionFailed; |
||
93 | |||
94 | public event ClientUnsubscribed OnClientUnsubscribed; |
||
95 | |||
96 | public event ClientSubscribed OnClientSubscribed; |
||
97 | |||
98 | public event ServerClientDisconnected OnServerClientDisconnected; |
||
99 | |||
100 | public event ServerClientConnected OnServerClientConnected; |
||
101 | |||
102 | public event ServerStarted OnServerStarted; |
||
103 | |||
104 | public event ServerStopped OnServerStopped; |
||
105 | |||
8 | office | 106 | public async Task Start(MQTTCommunicationType type, IPAddress ipAddress, int port, string nick, string password) |
5 | office | 107 | { |
108 | Type = type; |
||
109 | IPAddress = ipAddress; |
||
110 | Port = port; |
||
111 | Nick = nick; |
||
8 | office | 112 | Password = password; |
5 | office | 113 | |
114 | switch (type) |
||
115 | { |
||
116 | case MQTTCommunicationType.Client: |
||
117 | await StartClient().ConfigureAwait(false); |
||
118 | break; |
||
119 | case MQTTCommunicationType.Server: |
||
120 | await StartServer().ConfigureAwait(false); |
||
121 | break; |
||
122 | } |
||
123 | } |
||
124 | |||
125 | private async Task StartClient() |
||
126 | { |
||
127 | var clientOptions = new MqttClientOptionsBuilder() |
||
128 | .WithTcpServer(IPAddress.ToString(), Port); |
||
129 | |||
130 | // Setup and start a managed MQTT client. |
||
131 | var options = new ManagedMqttClientOptionsBuilder() |
||
132 | .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) |
||
133 | .WithClientOptions(clientOptions.Build()) |
||
134 | .Build(); |
||
135 | |||
136 | BindClientHandlers(); |
||
137 | |||
138 | await Client.SubscribeAsync( |
||
139 | new TopicFilterBuilder() |
||
140 | .WithTopic("lobby") |
||
141 | .Build() |
||
142 | ).ConfigureAwait(false); |
||
143 | |||
144 | await Client.SubscribeAsync( |
||
145 | new TopicFilterBuilder() |
||
146 | .WithTopic("exchange") |
||
147 | .Build() |
||
148 | ).ConfigureAwait(false); |
||
149 | |||
150 | await Client.StartAsync(options).ConfigureAwait(false); |
||
151 | |||
152 | Running = true; |
||
153 | } |
||
154 | |||
155 | private async Task StopClient() |
||
156 | { |
||
157 | UnbindClientHandlers(); |
||
158 | |||
159 | await Client.StopAsync().ConfigureAwait(false); |
||
160 | } |
||
161 | |||
162 | public void BindClientHandlers() |
||
163 | { |
||
164 | Client.Connected += ClientOnConnected; |
||
165 | Client.Disconnected += ClientOnDisconnected; |
||
166 | Client.ConnectingFailed += ClientOnConnectingFailed; |
||
167 | Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived; |
||
168 | } |
||
169 | |||
170 | public void UnbindClientHandlers() |
||
171 | { |
||
172 | Client.Connected -= ClientOnConnected; |
||
173 | Client.Disconnected -= ClientOnDisconnected; |
||
174 | Client.ConnectingFailed -= ClientOnConnectingFailed; |
||
175 | Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived; |
||
176 | } |
||
177 | |||
6 | office | 178 | private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) |
5 | office | 179 | { |
8 | office | 180 | try |
181 | { |
||
182 | e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); |
||
183 | } |
||
184 | catch (Exception) |
||
185 | { |
||
186 | await Task.Delay(0).ContinueWith(_ => OnClientAuthenticationFailed?.Invoke(sender, e), |
||
187 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
188 | |||
189 | return; |
||
190 | } |
||
191 | |||
6 | office | 192 | await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
7 | office | 193 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
5 | office | 194 | } |
195 | |||
6 | office | 196 | private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e) |
5 | office | 197 | { |
6 | office | 198 | await Task.Delay(0).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e), |
7 | office | 199 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 200 | } |
201 | |||
6 | office | 202 | private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e) |
5 | office | 203 | { |
6 | office | 204 | await Task.Delay(0).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e), |
7 | office | 205 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 206 | } |
207 | |||
6 | office | 208 | private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e) |
5 | office | 209 | { |
6 | office | 210 | await Task.Delay(0).ContinueWith(_ => OnClientConnected?.Invoke(sender, e), |
7 | office | 211 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 212 | } |
213 | |||
214 | private async Task StartServer() |
||
215 | { |
||
216 | var optionsBuilder = new MqttServerOptionsBuilder() |
||
217 | .WithDefaultEndpointBoundIPAddress(IPAddress) |
||
218 | .WithSubscriptionInterceptor(MQTTSubscriptionIntercept) |
||
8 | office | 219 | .WithConnectionValidator(MQTTConnectionValidator) |
5 | office | 220 | .WithDefaultEndpointPort(Port); |
221 | |||
222 | BindServerHandlers(); |
||
223 | |||
224 | await Server.StartAsync(optionsBuilder.Build()).ConfigureAwait(false); |
||
225 | |||
226 | Running = true; |
||
227 | } |
||
228 | |||
8 | office | 229 | private void MQTTConnectionValidator(MqttConnectionValidatorContext context) |
230 | { |
||
231 | // Do not accept connections from banned clients. |
||
232 | if (TrackedClients.Clients.Any(client => |
||
233 | (string.Equals(client.EndPoint, context.Endpoint, StringComparison.OrdinalIgnoreCase) || |
||
234 | string.Equals(client.ClientId, context.ClientId, StringComparison.Ordinal)) && |
||
235 | client.Banned)) |
||
236 | { |
||
237 | context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized; |
||
238 | return; |
||
239 | } |
||
240 | |||
241 | TrackedClients.Clients.Add(new TrackedClient {ClientId = context.ClientId, EndPoint = context.Endpoint}); |
||
242 | |||
243 | context.ReturnCode = MqttConnectReturnCode.ConnectionAccepted; |
||
244 | } |
||
245 | |||
5 | office | 246 | private async Task StopServer() |
247 | { |
||
248 | UnbindServerHandlers(); |
||
249 | |||
250 | await Server.StopAsync().ConfigureAwait(false); |
||
251 | } |
||
252 | |||
253 | private void MQTTSubscriptionIntercept(MqttSubscriptionInterceptorContext context) |
||
254 | { |
||
255 | if (context.TopicFilter.Topic != "lobby" && |
||
256 | context.TopicFilter.Topic != "exchange") |
||
257 | { |
||
258 | context.AcceptSubscription = false; |
||
259 | context.CloseConnection = true; |
||
260 | return; |
||
261 | } |
||
262 | |||
263 | context.AcceptSubscription = true; |
||
264 | context.CloseConnection = false; |
||
265 | } |
||
266 | |||
267 | private void BindServerHandlers() |
||
268 | { |
||
269 | Server.Started += ServerOnStarted; |
||
270 | Server.Stopped += ServerOnStopped; |
||
271 | Server.ClientConnected += ServerOnClientConnected; |
||
272 | Server.ClientDisconnected += ServerOnClientDisconnected; |
||
273 | Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic; |
||
274 | Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic; |
||
275 | Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived; |
||
276 | } |
||
277 | |||
278 | private void UnbindServerHandlers() |
||
279 | { |
||
280 | Server.Started -= ServerOnStarted; |
||
281 | Server.Stopped -= ServerOnStopped; |
||
282 | Server.ClientConnected -= ServerOnClientConnected; |
||
283 | Server.ClientDisconnected -= ServerOnClientDisconnected; |
||
284 | Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic; |
||
285 | Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic; |
||
286 | Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived; |
||
287 | } |
||
288 | |||
6 | office | 289 | private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) |
5 | office | 290 | { |
8 | office | 291 | try |
292 | { |
||
293 | e.ApplicationMessage.Payload = await AES.Decrypt(e.ApplicationMessage.Payload, Password); |
||
294 | } |
||
295 | catch (Exception) |
||
296 | { |
||
297 | // Decryption failed, assume a rogue client and ban the client. |
||
298 | foreach (var client in TrackedClients.Clients) |
||
299 | { |
||
300 | if (!string.Equals(client.ClientId, e.ClientId, StringComparison.Ordinal)) |
||
301 | continue; |
||
302 | |||
303 | client.Banned = true; |
||
304 | |||
305 | foreach (var clientSessionStatus in await Server.GetClientSessionsStatusAsync()) |
||
306 | { |
||
307 | if (!string.Equals(clientSessionStatus.ClientId, e.ClientId, StringComparison.Ordinal) && |
||
308 | !string.Equals(clientSessionStatus.Endpoint, client.EndPoint, StringComparison.Ordinal)) |
||
309 | continue; |
||
310 | |||
311 | await clientSessionStatus.ClearPendingApplicationMessagesAsync(); |
||
312 | await clientSessionStatus.DisconnectAsync(); |
||
313 | } |
||
314 | } |
||
315 | |||
316 | await Task.Delay(0).ContinueWith(_ => OnServerAuthenticationFailed?.Invoke(sender, e), |
||
317 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
||
318 | |||
319 | return; |
||
320 | } |
||
321 | |||
6 | office | 322 | await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
7 | office | 323 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 324 | } |
325 | |||
6 | office | 326 | private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e) |
5 | office | 327 | { |
6 | office | 328 | await Task.Delay(0).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e), |
7 | office | 329 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 330 | } |
331 | |||
6 | office | 332 | private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e) |
5 | office | 333 | { |
6 | office | 334 | await Task.Delay(0).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e), |
7 | office | 335 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 336 | } |
337 | |||
6 | office | 338 | private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e) |
5 | office | 339 | { |
6 | office | 340 | await Task.Delay(0).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e), |
7 | office | 341 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 342 | } |
343 | |||
6 | office | 344 | private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e) |
5 | office | 345 | { |
6 | office | 346 | await Task.Delay(0).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e), |
7 | office | 347 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 348 | } |
349 | |||
350 | private void ServerOnStopped(object sender, EventArgs e) |
||
351 | { |
||
352 | OnServerStopped?.Invoke(sender, e); |
||
353 | } |
||
354 | |||
355 | private void ServerOnStarted(object sender, EventArgs e) |
||
356 | { |
||
357 | OnServerStarted?.Invoke(sender, e); |
||
358 | } |
||
359 | |||
360 | public async Task Stop() |
||
361 | { |
||
362 | switch (Type) |
||
363 | { |
||
364 | case MQTTCommunicationType.Server: |
||
365 | await StopServer().ConfigureAwait(false); |
||
366 | break; |
||
367 | case MQTTCommunicationType.Client: |
||
368 | await StopClient().ConfigureAwait(false); |
||
369 | break; |
||
370 | } |
||
371 | |||
372 | Running = false; |
||
373 | } |
||
374 | |||
375 | public async Task Broadcast(string topic, byte[] payload) |
||
376 | { |
||
8 | office | 377 | // Encrypt the payload. |
378 | var encryptedPayload = await AES.Encrypt(payload, Password); |
||
379 | |||
5 | office | 380 | switch (Type) |
381 | { |
||
382 | case MQTTCommunicationType.Client: |
||
8 | office | 383 | |
5 | office | 384 | await Client.PublishAsync(new ManagedMqttApplicationMessage |
385 | { |
||
8 | office | 386 | ApplicationMessage = new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload} |
5 | office | 387 | }).ConfigureAwait(false); |
388 | break; |
||
389 | case MQTTCommunicationType.Server: |
||
8 | office | 390 | await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = encryptedPayload}) |
7 | office | 391 | .ConfigureAwait(false); |
5 | office | 392 | break; |
393 | } |
||
394 | } |
||
395 | } |
||
8 | office | 396 | } |