WingMan – Blame information for rev
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
5 | office | 1 | using System; |
2 | using System.Net; |
||
6 | office | 3 | using System.Threading; |
5 | office | 4 | using System.Threading.Tasks; |
5 | using MQTTnet; |
||
6 | using MQTTnet.Client; |
||
7 | using MQTTnet.Extensions.ManagedClient; |
||
8 | using MQTTnet.Server; |
||
9 | using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs; |
||
10 | using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs; |
||
11 | |||
12 | namespace WingMan.Communication |
||
13 | { |
||
7 | office | 14 | public class MQTTCommunication : IDisposable |
5 | office | 15 | { |
7 | office | 16 | public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e); |
17 | |||
18 | public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e); |
||
19 | |||
20 | public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e); |
||
21 | |||
22 | public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e); |
||
23 | |||
24 | public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e); |
||
25 | |||
26 | public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e); |
||
27 | |||
28 | public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e); |
||
29 | |||
30 | public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e); |
||
31 | |||
32 | public delegate void ServerStarted(object sender, EventArgs e); |
||
33 | |||
34 | public delegate void ServerStopped(object sender, EventArgs e); |
||
35 | |||
36 | public MQTTCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken) |
||
5 | office | 37 | { |
6 | office | 38 | TaskScheduler = taskScheduler; |
7 | office | 39 | CancellationToken = cancellationToken; |
5 | office | 40 | |
41 | Client = new MqttFactory().CreateManagedMqttClient(); |
||
42 | Server = new MqttFactory().CreateMqttServer(); |
||
43 | } |
||
44 | |||
7 | office | 45 | private TaskScheduler TaskScheduler { get; } |
6 | office | 46 | |
5 | office | 47 | private IManagedMqttClient Client { get; } |
48 | |||
49 | private IMqttServer Server { get; } |
||
50 | |||
51 | public bool Running { get; set; } |
||
52 | |||
53 | public string Nick { get; set; } |
||
54 | |||
55 | private IPAddress IPAddress { get; set; } |
||
56 | |||
57 | private int Port { get; set; } |
||
58 | |||
7 | office | 59 | private CancellationToken CancellationToken { get; } |
6 | office | 60 | |
5 | office | 61 | public MQTTCommunicationType Type { get; set; } |
62 | |||
7 | office | 63 | public async void Dispose() |
64 | { |
||
65 | await Stop(); |
||
66 | } |
||
5 | office | 67 | |
68 | public event MessageReceived OnMessageReceived; |
||
69 | |||
70 | public event ClientConnected OnClientConnected; |
||
71 | |||
72 | public event ClientDisconnected OnClientDisconnected; |
||
73 | |||
74 | public event ClientConnectionFailed OnClientConnectionFailed; |
||
75 | |||
76 | public event ClientUnsubscribed OnClientUnsubscribed; |
||
77 | |||
78 | public event ClientSubscribed OnClientSubscribed; |
||
79 | |||
80 | public event ServerClientDisconnected OnServerClientDisconnected; |
||
81 | |||
82 | public event ServerClientConnected OnServerClientConnected; |
||
83 | |||
84 | public event ServerStarted OnServerStarted; |
||
85 | |||
86 | public event ServerStopped OnServerStopped; |
||
87 | |||
88 | public async Task Start(MQTTCommunicationType type, IPAddress ipAddress, int port, string nick) |
||
89 | { |
||
90 | Type = type; |
||
91 | IPAddress = ipAddress; |
||
92 | Port = port; |
||
93 | Nick = nick; |
||
94 | |||
95 | switch (type) |
||
96 | { |
||
97 | case MQTTCommunicationType.Client: |
||
98 | await StartClient().ConfigureAwait(false); |
||
99 | break; |
||
100 | case MQTTCommunicationType.Server: |
||
101 | await StartServer().ConfigureAwait(false); |
||
102 | break; |
||
103 | } |
||
104 | } |
||
105 | |||
106 | private async Task StartClient() |
||
107 | { |
||
108 | var clientOptions = new MqttClientOptionsBuilder() |
||
109 | .WithTcpServer(IPAddress.ToString(), Port); |
||
110 | |||
111 | // Setup and start a managed MQTT client. |
||
112 | var options = new ManagedMqttClientOptionsBuilder() |
||
113 | .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) |
||
114 | .WithClientOptions(clientOptions.Build()) |
||
115 | .Build(); |
||
116 | |||
117 | BindClientHandlers(); |
||
118 | |||
119 | await Client.SubscribeAsync( |
||
120 | new TopicFilterBuilder() |
||
121 | .WithTopic("lobby") |
||
122 | .Build() |
||
123 | ).ConfigureAwait(false); |
||
124 | |||
125 | await Client.SubscribeAsync( |
||
126 | new TopicFilterBuilder() |
||
127 | .WithTopic("exchange") |
||
128 | .Build() |
||
129 | ).ConfigureAwait(false); |
||
130 | |||
131 | await Client.StartAsync(options).ConfigureAwait(false); |
||
132 | |||
133 | Running = true; |
||
134 | } |
||
135 | |||
136 | private async Task StopClient() |
||
137 | { |
||
138 | UnbindClientHandlers(); |
||
139 | |||
140 | await Client.StopAsync().ConfigureAwait(false); |
||
141 | } |
||
142 | |||
143 | public void BindClientHandlers() |
||
144 | { |
||
145 | Client.Connected += ClientOnConnected; |
||
146 | Client.Disconnected += ClientOnDisconnected; |
||
147 | Client.ConnectingFailed += ClientOnConnectingFailed; |
||
148 | Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived; |
||
149 | } |
||
150 | |||
151 | public void UnbindClientHandlers() |
||
152 | { |
||
153 | Client.Connected -= ClientOnConnected; |
||
154 | Client.Disconnected -= ClientOnDisconnected; |
||
155 | Client.ConnectingFailed -= ClientOnConnectingFailed; |
||
156 | Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived; |
||
157 | } |
||
158 | |||
6 | office | 159 | private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) |
5 | office | 160 | { |
6 | office | 161 | await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
7 | office | 162 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
5 | office | 163 | } |
164 | |||
6 | office | 165 | private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e) |
5 | office | 166 | { |
6 | office | 167 | await Task.Delay(0).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e), |
7 | office | 168 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 169 | } |
170 | |||
6 | office | 171 | private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e) |
5 | office | 172 | { |
6 | office | 173 | await Task.Delay(0).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e), |
7 | office | 174 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 175 | } |
176 | |||
6 | office | 177 | private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e) |
5 | office | 178 | { |
6 | office | 179 | await Task.Delay(0).ContinueWith(_ => OnClientConnected?.Invoke(sender, e), |
7 | office | 180 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 181 | } |
182 | |||
183 | private async Task StartServer() |
||
184 | { |
||
185 | var optionsBuilder = new MqttServerOptionsBuilder() |
||
186 | .WithDefaultEndpointBoundIPAddress(IPAddress) |
||
187 | .WithSubscriptionInterceptor(MQTTSubscriptionIntercept) |
||
188 | .WithDefaultEndpointPort(Port); |
||
189 | |||
190 | BindServerHandlers(); |
||
191 | |||
192 | await Server.StartAsync(optionsBuilder.Build()).ConfigureAwait(false); |
||
193 | |||
194 | Running = true; |
||
195 | } |
||
196 | |||
197 | private async Task StopServer() |
||
198 | { |
||
199 | UnbindServerHandlers(); |
||
200 | |||
201 | await Server.StopAsync().ConfigureAwait(false); |
||
202 | } |
||
203 | |||
204 | private void MQTTSubscriptionIntercept(MqttSubscriptionInterceptorContext context) |
||
205 | { |
||
206 | if (context.TopicFilter.Topic != "lobby" && |
||
207 | context.TopicFilter.Topic != "exchange") |
||
208 | { |
||
209 | context.AcceptSubscription = false; |
||
210 | context.CloseConnection = true; |
||
211 | return; |
||
212 | } |
||
213 | |||
214 | context.AcceptSubscription = true; |
||
215 | context.CloseConnection = false; |
||
216 | } |
||
217 | |||
218 | private void BindServerHandlers() |
||
219 | { |
||
220 | Server.Started += ServerOnStarted; |
||
221 | Server.Stopped += ServerOnStopped; |
||
222 | Server.ClientConnected += ServerOnClientConnected; |
||
223 | Server.ClientDisconnected += ServerOnClientDisconnected; |
||
224 | Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic; |
||
225 | Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic; |
||
226 | Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived; |
||
227 | } |
||
228 | |||
229 | private void UnbindServerHandlers() |
||
230 | { |
||
231 | Server.Started -= ServerOnStarted; |
||
232 | Server.Stopped -= ServerOnStopped; |
||
233 | Server.ClientConnected -= ServerOnClientConnected; |
||
234 | Server.ClientDisconnected -= ServerOnClientDisconnected; |
||
235 | Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic; |
||
236 | Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic; |
||
237 | Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived; |
||
238 | } |
||
239 | |||
6 | office | 240 | private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) |
5 | office | 241 | { |
6 | office | 242 | await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e), |
7 | office | 243 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 244 | } |
245 | |||
6 | office | 246 | private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e) |
5 | office | 247 | { |
6 | office | 248 | await Task.Delay(0).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e), |
7 | office | 249 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 250 | } |
251 | |||
6 | office | 252 | private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e) |
5 | office | 253 | { |
6 | office | 254 | await Task.Delay(0).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e), |
7 | office | 255 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 256 | } |
257 | |||
6 | office | 258 | private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e) |
5 | office | 259 | { |
6 | office | 260 | await Task.Delay(0).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e), |
7 | office | 261 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 262 | } |
263 | |||
6 | office | 264 | private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e) |
5 | office | 265 | { |
6 | office | 266 | await Task.Delay(0).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e), |
7 | office | 267 | CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false); |
5 | office | 268 | } |
269 | |||
270 | private void ServerOnStopped(object sender, EventArgs e) |
||
271 | { |
||
272 | OnServerStopped?.Invoke(sender, e); |
||
273 | } |
||
274 | |||
275 | private void ServerOnStarted(object sender, EventArgs e) |
||
276 | { |
||
277 | OnServerStarted?.Invoke(sender, e); |
||
278 | } |
||
279 | |||
280 | public async Task Stop() |
||
281 | { |
||
282 | switch (Type) |
||
283 | { |
||
284 | case MQTTCommunicationType.Server: |
||
285 | await StopServer().ConfigureAwait(false); |
||
286 | break; |
||
287 | case MQTTCommunicationType.Client: |
||
288 | await StopClient().ConfigureAwait(false); |
||
289 | break; |
||
290 | } |
||
291 | |||
292 | Running = false; |
||
293 | } |
||
294 | |||
295 | public async Task Broadcast(string topic, byte[] payload) |
||
296 | { |
||
297 | switch (Type) |
||
298 | { |
||
299 | case MQTTCommunicationType.Client: |
||
300 | await Client.PublishAsync(new ManagedMqttApplicationMessage |
||
301 | { |
||
7 | office | 302 | ApplicationMessage = new MqttApplicationMessage {Topic = topic, Payload = payload} |
5 | office | 303 | }).ConfigureAwait(false); |
304 | break; |
||
305 | case MQTTCommunicationType.Server: |
||
7 | office | 306 | await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = payload}) |
307 | .ConfigureAwait(false); |
||
5 | office | 308 | break; |
309 | } |
||
310 | } |
||
311 | } |
||
7 | office | 312 | } |