WingMan – Blame information for rev 7

Subversion Repositories:
Rev:
Rev Author Line No. Line
5 office 1 using System;
2 using System.Net;
6 office 3 using System.Threading;
5 office 4 using System.Threading.Tasks;
5 using MQTTnet;
6 using MQTTnet.Client;
7 using MQTTnet.Extensions.ManagedClient;
8 using MQTTnet.Server;
9 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
10 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
11  
12 namespace WingMan.Communication
13 {
7 office 14 public class MQTTCommunication : IDisposable
5 office 15 {
7 office 16 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
17  
18 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
19  
20 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
21  
22 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
23  
24 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
25  
26 public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);
27  
28 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
29  
30 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
31  
32 public delegate void ServerStarted(object sender, EventArgs e);
33  
34 public delegate void ServerStopped(object sender, EventArgs e);
35  
36 public MQTTCommunication(TaskScheduler taskScheduler, CancellationToken cancellationToken)
5 office 37 {
6 office 38 TaskScheduler = taskScheduler;
7 office 39 CancellationToken = cancellationToken;
5 office 40  
41 Client = new MqttFactory().CreateManagedMqttClient();
42 Server = new MqttFactory().CreateMqttServer();
43 }
44  
7 office 45 private TaskScheduler TaskScheduler { get; }
6 office 46  
5 office 47 private IManagedMqttClient Client { get; }
48  
49 private IMqttServer Server { get; }
50  
51 public bool Running { get; set; }
52  
53 public string Nick { get; set; }
54  
55 private IPAddress IPAddress { get; set; }
56  
57 private int Port { get; set; }
58  
7 office 59 private CancellationToken CancellationToken { get; }
6 office 60  
5 office 61 public MQTTCommunicationType Type { get; set; }
62  
7 office 63 public async void Dispose()
64 {
65 await Stop();
66 }
5 office 67  
68 public event MessageReceived OnMessageReceived;
69  
70 public event ClientConnected OnClientConnected;
71  
72 public event ClientDisconnected OnClientDisconnected;
73  
74 public event ClientConnectionFailed OnClientConnectionFailed;
75  
76 public event ClientUnsubscribed OnClientUnsubscribed;
77  
78 public event ClientSubscribed OnClientSubscribed;
79  
80 public event ServerClientDisconnected OnServerClientDisconnected;
81  
82 public event ServerClientConnected OnServerClientConnected;
83  
84 public event ServerStarted OnServerStarted;
85  
86 public event ServerStopped OnServerStopped;
87  
88 public async Task Start(MQTTCommunicationType type, IPAddress ipAddress, int port, string nick)
89 {
90 Type = type;
91 IPAddress = ipAddress;
92 Port = port;
93 Nick = nick;
94  
95 switch (type)
96 {
97 case MQTTCommunicationType.Client:
98 await StartClient().ConfigureAwait(false);
99 break;
100 case MQTTCommunicationType.Server:
101 await StartServer().ConfigureAwait(false);
102 break;
103 }
104 }
105  
106 private async Task StartClient()
107 {
108 var clientOptions = new MqttClientOptionsBuilder()
109 .WithTcpServer(IPAddress.ToString(), Port);
110  
111 // Setup and start a managed MQTT client.
112 var options = new ManagedMqttClientOptionsBuilder()
113 .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
114 .WithClientOptions(clientOptions.Build())
115 .Build();
116  
117 BindClientHandlers();
118  
119 await Client.SubscribeAsync(
120 new TopicFilterBuilder()
121 .WithTopic("lobby")
122 .Build()
123 ).ConfigureAwait(false);
124  
125 await Client.SubscribeAsync(
126 new TopicFilterBuilder()
127 .WithTopic("exchange")
128 .Build()
129 ).ConfigureAwait(false);
130  
131 await Client.StartAsync(options).ConfigureAwait(false);
132  
133 Running = true;
134 }
135  
136 private async Task StopClient()
137 {
138 UnbindClientHandlers();
139  
140 await Client.StopAsync().ConfigureAwait(false);
141 }
142  
143 public void BindClientHandlers()
144 {
145 Client.Connected += ClientOnConnected;
146 Client.Disconnected += ClientOnDisconnected;
147 Client.ConnectingFailed += ClientOnConnectingFailed;
148 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
149 }
150  
151 public void UnbindClientHandlers()
152 {
153 Client.Connected -= ClientOnConnected;
154 Client.Disconnected -= ClientOnDisconnected;
155 Client.ConnectingFailed -= ClientOnConnectingFailed;
156 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
157 }
158  
6 office 159 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
5 office 160 {
6 office 161 await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
7 office 162 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
5 office 163 }
164  
6 office 165 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
5 office 166 {
6 office 167 await Task.Delay(0).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
7 office 168 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 169 }
170  
6 office 171 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
5 office 172 {
6 office 173 await Task.Delay(0).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
7 office 174 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 175 }
176  
6 office 177 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
5 office 178 {
6 office 179 await Task.Delay(0).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
7 office 180 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 181 }
182  
183 private async Task StartServer()
184 {
185 var optionsBuilder = new MqttServerOptionsBuilder()
186 .WithDefaultEndpointBoundIPAddress(IPAddress)
187 .WithSubscriptionInterceptor(MQTTSubscriptionIntercept)
188 .WithDefaultEndpointPort(Port);
189  
190 BindServerHandlers();
191  
192 await Server.StartAsync(optionsBuilder.Build()).ConfigureAwait(false);
193  
194 Running = true;
195 }
196  
197 private async Task StopServer()
198 {
199 UnbindServerHandlers();
200  
201 await Server.StopAsync().ConfigureAwait(false);
202 }
203  
204 private void MQTTSubscriptionIntercept(MqttSubscriptionInterceptorContext context)
205 {
206 if (context.TopicFilter.Topic != "lobby" &&
207 context.TopicFilter.Topic != "exchange")
208 {
209 context.AcceptSubscription = false;
210 context.CloseConnection = true;
211 return;
212 }
213  
214 context.AcceptSubscription = true;
215 context.CloseConnection = false;
216 }
217  
218 private void BindServerHandlers()
219 {
220 Server.Started += ServerOnStarted;
221 Server.Stopped += ServerOnStopped;
222 Server.ClientConnected += ServerOnClientConnected;
223 Server.ClientDisconnected += ServerOnClientDisconnected;
224 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
225 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
226 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
227 }
228  
229 private void UnbindServerHandlers()
230 {
231 Server.Started -= ServerOnStarted;
232 Server.Stopped -= ServerOnStopped;
233 Server.ClientConnected -= ServerOnClientConnected;
234 Server.ClientDisconnected -= ServerOnClientDisconnected;
235 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
236 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
237 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
238 }
239  
6 office 240 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
5 office 241 {
6 office 242 await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
7 office 243 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 244 }
245  
6 office 246 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
5 office 247 {
6 office 248 await Task.Delay(0).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
7 office 249 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 250 }
251  
6 office 252 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
5 office 253 {
6 office 254 await Task.Delay(0).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
7 office 255 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 256 }
257  
6 office 258 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
5 office 259 {
6 office 260 await Task.Delay(0).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
7 office 261 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 262 }
263  
6 office 264 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
5 office 265 {
6 office 266 await Task.Delay(0).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
7 office 267 CancellationToken, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 268 }
269  
270 private void ServerOnStopped(object sender, EventArgs e)
271 {
272 OnServerStopped?.Invoke(sender, e);
273 }
274  
275 private void ServerOnStarted(object sender, EventArgs e)
276 {
277 OnServerStarted?.Invoke(sender, e);
278 }
279  
280 public async Task Stop()
281 {
282 switch (Type)
283 {
284 case MQTTCommunicationType.Server:
285 await StopServer().ConfigureAwait(false);
286 break;
287 case MQTTCommunicationType.Client:
288 await StopClient().ConfigureAwait(false);
289 break;
290 }
291  
292 Running = false;
293 }
294  
295 public async Task Broadcast(string topic, byte[] payload)
296 {
297 switch (Type)
298 {
299 case MQTTCommunicationType.Client:
300 await Client.PublishAsync(new ManagedMqttApplicationMessage
301 {
7 office 302 ApplicationMessage = new MqttApplicationMessage {Topic = topic, Payload = payload}
5 office 303 }).ConfigureAwait(false);
304 break;
305 case MQTTCommunicationType.Server:
7 office 306 await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = payload})
307 .ConfigureAwait(false);
5 office 308 break;
309 }
310 }
311 }
7 office 312 }