WingMan – Diff between revs 5 and 6

Subversion Repositories:
Rev:
Only display areas with differencesIgnore whitespace
Rev 5 Rev 6
1 using System; 1 using System;
2 using System.Net; 2 using System.Net;
-   3 using System.Threading;
3 using System.Threading.Tasks; 4 using System.Threading.Tasks;
4 using MQTTnet; 5 using MQTTnet;
5 using MQTTnet.Client; 6 using MQTTnet.Client;
6 using MQTTnet.Extensions.ManagedClient; 7 using MQTTnet.Extensions.ManagedClient;
7 using MQTTnet.Server; 8 using MQTTnet.Server;
8 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs; 9 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
9 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs; 10 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
10   11  
11 namespace WingMan.Communication 12 namespace WingMan.Communication
12 { 13 {
13 public class MQTTCommunication 14 public class MQTTCommunication
14 { 15 {
15 public MQTTCommunication(IPAddress ipAddress, int port, string nick) :this() 16 public MQTTCommunication(TaskScheduler taskScheduler)
16 { 17 {
17 IPAddress = ipAddress; 18 TaskScheduler = taskScheduler;
18 Port = port; -  
19 Nick = nick; 19 CancellationTokenSource = new CancellationTokenSource();
20 } -  
21   -  
22 public MQTTCommunication() -  
23 { 20  
24 Client = new MqttFactory().CreateManagedMqttClient(); 21 Client = new MqttFactory().CreateManagedMqttClient();
25 Server = new MqttFactory().CreateMqttServer(); 22 Server = new MqttFactory().CreateMqttServer();
26 } 23 }
-   24  
-   25 private TaskScheduler TaskScheduler { get; set; }
27   26  
28 private IManagedMqttClient Client { get; } 27 private IManagedMqttClient Client { get; }
29   28  
30 private IMqttServer Server { get; } 29 private IMqttServer Server { get; }
31   30  
32 public bool Running { get; set; } 31 public bool Running { get; set; }
33   32  
34 public string Nick { get; set; } 33 public string Nick { get; set; }
35   34  
36 private IPAddress IPAddress { get; set; } 35 private IPAddress IPAddress { get; set; }
37   36  
38 private int Port { get; set; } 37 private int Port { get; set; }
-   38  
-   39 private CancellationTokenSource CancellationTokenSource { get; set; }
39   40  
40 public MQTTCommunicationType Type { get; set; } 41 public MQTTCommunicationType Type { get; set; }
41   42  
42 public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e); 43 public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);
43   44  
44 public event MessageReceived OnMessageReceived; 45 public event MessageReceived OnMessageReceived;
45   46  
46 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e); 47 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
47   48  
48 public event ClientConnected OnClientConnected; 49 public event ClientConnected OnClientConnected;
49   50  
50 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e); 51 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
51   52  
52 public event ClientDisconnected OnClientDisconnected; 53 public event ClientDisconnected OnClientDisconnected;
53   54  
54 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e); 55 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
55   56  
56 public event ClientConnectionFailed OnClientConnectionFailed; 57 public event ClientConnectionFailed OnClientConnectionFailed;
57   58  
58 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e); 59 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
59   60  
60 public event ClientUnsubscribed OnClientUnsubscribed; 61 public event ClientUnsubscribed OnClientUnsubscribed;
61   62  
62 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e); 63 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
63   64  
64 public event ClientSubscribed OnClientSubscribed; 65 public event ClientSubscribed OnClientSubscribed;
65   66  
66 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e); 67 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
67   68  
68 public event ServerClientDisconnected OnServerClientDisconnected; 69 public event ServerClientDisconnected OnServerClientDisconnected;
69   70  
70 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e); 71 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
71   72  
72 public event ServerClientConnected OnServerClientConnected; 73 public event ServerClientConnected OnServerClientConnected;
73   74  
74 public delegate void ServerStarted(object sender, EventArgs e); 75 public delegate void ServerStarted(object sender, EventArgs e);
75   76  
76 public event ServerStarted OnServerStarted; 77 public event ServerStarted OnServerStarted;
77   78  
78 public delegate void ServerStopped(object sender, EventArgs e); 79 public delegate void ServerStopped(object sender, EventArgs e);
79   80  
80 public event ServerStopped OnServerStopped; 81 public event ServerStopped OnServerStopped;
81   82  
82 public async Task Start(MQTTCommunicationType type, IPAddress ipAddress, int port, string nick) 83 public async Task Start(MQTTCommunicationType type, IPAddress ipAddress, int port, string nick)
83 { 84 {
84 Type = type; 85 Type = type;
85 IPAddress = ipAddress; 86 IPAddress = ipAddress;
86 Port = port; 87 Port = port;
87 Nick = nick; 88 Nick = nick;
88   89  
89 switch (type) 90 switch (type)
90 { 91 {
91 case MQTTCommunicationType.Client: 92 case MQTTCommunicationType.Client:
92 await StartClient().ConfigureAwait(false); 93 await StartClient().ConfigureAwait(false);
93 break; 94 break;
94 case MQTTCommunicationType.Server: 95 case MQTTCommunicationType.Server:
95 await StartServer().ConfigureAwait(false); 96 await StartServer().ConfigureAwait(false);
96 break; 97 break;
97 } 98 }
98 } 99 }
99   100  
100 private async Task StartClient() 101 private async Task StartClient()
101 { 102 {
102 var clientOptions = new MqttClientOptionsBuilder() 103 var clientOptions = new MqttClientOptionsBuilder()
103 .WithTcpServer(IPAddress.ToString(), Port); 104 .WithTcpServer(IPAddress.ToString(), Port);
104   105  
105 // Setup and start a managed MQTT client. 106 // Setup and start a managed MQTT client.
106 var options = new ManagedMqttClientOptionsBuilder() 107 var options = new ManagedMqttClientOptionsBuilder()
107 .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) 108 .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
108 .WithClientOptions(clientOptions.Build()) 109 .WithClientOptions(clientOptions.Build())
109 .Build(); 110 .Build();
110   111  
111 BindClientHandlers(); 112 BindClientHandlers();
112   113  
113 await Client.SubscribeAsync( 114 await Client.SubscribeAsync(
114 new TopicFilterBuilder() 115 new TopicFilterBuilder()
115 .WithTopic("lobby") 116 .WithTopic("lobby")
116 .Build() 117 .Build()
117 ).ConfigureAwait(false); 118 ).ConfigureAwait(false);
118   119  
119 await Client.SubscribeAsync( 120 await Client.SubscribeAsync(
120 new TopicFilterBuilder() 121 new TopicFilterBuilder()
121 .WithTopic("exchange") 122 .WithTopic("exchange")
122 .Build() 123 .Build()
123 ).ConfigureAwait(false); 124 ).ConfigureAwait(false);
124   125  
125 await Client.StartAsync(options).ConfigureAwait(false); 126 await Client.StartAsync(options).ConfigureAwait(false);
126   127  
127 Running = true; 128 Running = true;
128 } 129 }
129   130  
130 private async Task StopClient() 131 private async Task StopClient()
131 { 132 {
132 UnbindClientHandlers(); 133 UnbindClientHandlers();
133   134  
134 await Client.StopAsync().ConfigureAwait(false); 135 await Client.StopAsync().ConfigureAwait(false);
135 } 136 }
136   137  
137 public void BindClientHandlers() 138 public void BindClientHandlers()
138 { 139 {
139 Client.Connected += ClientOnConnected; 140 Client.Connected += ClientOnConnected;
140 Client.Disconnected += ClientOnDisconnected; 141 Client.Disconnected += ClientOnDisconnected;
141 Client.ConnectingFailed += ClientOnConnectingFailed; 142 Client.ConnectingFailed += ClientOnConnectingFailed;
142 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived; 143 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
143 } 144 }
144   145  
145 public void UnbindClientHandlers() 146 public void UnbindClientHandlers()
146 { 147 {
147 Client.Connected -= ClientOnConnected; 148 Client.Connected -= ClientOnConnected;
148 Client.Disconnected -= ClientOnDisconnected; 149 Client.Disconnected -= ClientOnDisconnected;
149 Client.ConnectingFailed -= ClientOnConnectingFailed; 150 Client.ConnectingFailed -= ClientOnConnectingFailed;
150 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived; 151 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
151 } 152 }
152   153  
153 private void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) 154 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
154 { 155 {
-   156 await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
155 OnMessageReceived?.Invoke(sender, e); 157 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler);
156 } 158 }
157   159  
158 private void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e) 160 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
-   161 {
159 { 162 await Task.Delay(0).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
160 OnClientConnectionFailed?.Invoke(sender, e); 163 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
161 } 164 }
162   165  
-   166 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
163 private void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e) 167 {
164 { 168 await Task.Delay(0).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
165 OnClientDisconnected?.Invoke(sender, e); 169 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
166 } 170 }
-   171  
167   172 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
168 private void ClientOnConnected(object sender, MqttClientConnectedEventArgs e) 173 {
169 { 174 await Task.Delay(0).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
170 OnClientConnected?.Invoke(sender, e); 175 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
171 } 176 }
172   177  
173 private async Task StartServer() 178 private async Task StartServer()
174 { 179 {
175 var optionsBuilder = new MqttServerOptionsBuilder() 180 var optionsBuilder = new MqttServerOptionsBuilder()
176 .WithDefaultEndpointBoundIPAddress(IPAddress) 181 .WithDefaultEndpointBoundIPAddress(IPAddress)
177 .WithSubscriptionInterceptor(MQTTSubscriptionIntercept) 182 .WithSubscriptionInterceptor(MQTTSubscriptionIntercept)
178 .WithDefaultEndpointPort(Port); 183 .WithDefaultEndpointPort(Port);
179   184  
180 BindServerHandlers(); 185 BindServerHandlers();
181   186  
182 await Server.StartAsync(optionsBuilder.Build()).ConfigureAwait(false); 187 await Server.StartAsync(optionsBuilder.Build()).ConfigureAwait(false);
183   188  
184 Running = true; 189 Running = true;
185 } 190 }
186   191  
187 private async Task StopServer() 192 private async Task StopServer()
188 { 193 {
189 UnbindServerHandlers(); 194 UnbindServerHandlers();
190   195  
191 await Server.StopAsync().ConfigureAwait(false); 196 await Server.StopAsync().ConfigureAwait(false);
192 } 197 }
193   198  
194 private void MQTTSubscriptionIntercept(MqttSubscriptionInterceptorContext context) 199 private void MQTTSubscriptionIntercept(MqttSubscriptionInterceptorContext context)
195 { 200 {
196 if (context.TopicFilter.Topic != "lobby" && 201 if (context.TopicFilter.Topic != "lobby" &&
197 context.TopicFilter.Topic != "exchange") 202 context.TopicFilter.Topic != "exchange")
198 { 203 {
199 context.AcceptSubscription = false; 204 context.AcceptSubscription = false;
200 context.CloseConnection = true; 205 context.CloseConnection = true;
201 return; 206 return;
202 } 207 }
203   208  
204 context.AcceptSubscription = true; 209 context.AcceptSubscription = true;
205 context.CloseConnection = false; 210 context.CloseConnection = false;
206 } 211 }
207   212  
208 private void BindServerHandlers() 213 private void BindServerHandlers()
209 { 214 {
210 Server.Started += ServerOnStarted; 215 Server.Started += ServerOnStarted;
211 Server.Stopped += ServerOnStopped; 216 Server.Stopped += ServerOnStopped;
212 Server.ClientConnected += ServerOnClientConnected; 217 Server.ClientConnected += ServerOnClientConnected;
213 Server.ClientDisconnected += ServerOnClientDisconnected; 218 Server.ClientDisconnected += ServerOnClientDisconnected;
214 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic; 219 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
215 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic; 220 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
216 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived; 221 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
217 } 222 }
218   223  
219 private void UnbindServerHandlers() 224 private void UnbindServerHandlers()
220 { 225 {
221 Server.Started -= ServerOnStarted; 226 Server.Started -= ServerOnStarted;
222 Server.Stopped -= ServerOnStopped; 227 Server.Stopped -= ServerOnStopped;
223 Server.ClientConnected -= ServerOnClientConnected; 228 Server.ClientConnected -= ServerOnClientConnected;
224 Server.ClientDisconnected -= ServerOnClientDisconnected; 229 Server.ClientDisconnected -= ServerOnClientDisconnected;
225 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic; 230 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
226 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic; 231 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
227 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived; 232 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
228 } 233 }
229   234  
230 private void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) 235 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
231 { 236 {
-   237 await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
232 OnMessageReceived?.Invoke(sender, e); 238 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
233 } 239 }
234   240  
235 private void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e) 241 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
-   242 {
236 { 243 await Task.Delay(0).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
237 OnClientUnsubscribed?.Invoke(sender, e); 244 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
238 } 245 }
239   246  
-   247 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
240 private void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e) 248 {
241 { 249 await Task.Delay(0).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
242 OnClientSubscribed?.Invoke(sender, e); 250 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
243 } 251 }
-   252  
244   253 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
245 private void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e) 254 {
246 { 255 await Task.Delay(0).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
247 OnServerClientDisconnected?.Invoke(sender, e); 256 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
-   257 }
248 } 258  
249   259 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
250 private void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e) 260 {
251 { 261 await Task.Delay(0).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
252 OnServerClientConnected?.Invoke(sender, e); 262 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
253 } 263 }
254   264  
255 private void ServerOnStopped(object sender, EventArgs e) 265 private void ServerOnStopped(object sender, EventArgs e)
256 { 266 {
257 OnServerStopped?.Invoke(sender, e); 267 OnServerStopped?.Invoke(sender, e);
258 } 268 }
259   269  
260 private void ServerOnStarted(object sender, EventArgs e) 270 private void ServerOnStarted(object sender, EventArgs e)
261 { 271 {
262 OnServerStarted?.Invoke(sender, e); 272 OnServerStarted?.Invoke(sender, e);
263 } 273 }
264   274  
265 public async Task Stop() 275 public async Task Stop()
266 { 276 {
267 switch (Type) 277 switch (Type)
268 { 278 {
269 case MQTTCommunicationType.Server: 279 case MQTTCommunicationType.Server:
270 await StopServer().ConfigureAwait(false); 280 await StopServer().ConfigureAwait(false);
271 break; 281 break;
272 case MQTTCommunicationType.Client: 282 case MQTTCommunicationType.Client:
273 await StopClient().ConfigureAwait(false); 283 await StopClient().ConfigureAwait(false);
274 break; 284 break;
275 } 285 }
276   286  
277 Running = false; 287 Running = false;
278 } 288 }
279   289  
280 public async Task Broadcast(string topic, byte[] payload) 290 public async Task Broadcast(string topic, byte[] payload)
281 { 291 {
282 switch (Type) 292 switch (Type)
283 { 293 {
284 case MQTTCommunicationType.Client: 294 case MQTTCommunicationType.Client:
285 await Client.PublishAsync(new ManagedMqttApplicationMessage 295 await Client.PublishAsync(new ManagedMqttApplicationMessage
286 { 296 {
287 ApplicationMessage = new MqttApplicationMessage { Topic = topic, Payload = payload } 297 ApplicationMessage = new MqttApplicationMessage { Topic = topic, Payload = payload }
288 }).ConfigureAwait(false); 298 }).ConfigureAwait(false);
289 break; 299 break;
290 case MQTTCommunicationType.Server: 300 case MQTTCommunicationType.Server:
291 await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = payload}).ConfigureAwait(false); 301 await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = payload}).ConfigureAwait(false);
292 break; 302 break;
293 } 303 }
294 } 304 }
295 } 305 }
296 } 306 }
297   307