WingMan – Blame information for rev 6

Subversion Repositories:
Rev:
Rev Author Line No. Line
5 office 1 using System;
2 using System.Net;
6 office 3 using System.Threading;
5 office 4 using System.Threading.Tasks;
5 using MQTTnet;
6 using MQTTnet.Client;
7 using MQTTnet.Extensions.ManagedClient;
8 using MQTTnet.Server;
9 using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
10 using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;
11  
12 namespace WingMan.Communication
13 {
14 public class MQTTCommunication
15 {
6 office 16 public MQTTCommunication(TaskScheduler taskScheduler)
5 office 17 {
6 office 18 TaskScheduler = taskScheduler;
19 CancellationTokenSource = new CancellationTokenSource();
5 office 20  
21 Client = new MqttFactory().CreateManagedMqttClient();
22 Server = new MqttFactory().CreateMqttServer();
23 }
24  
6 office 25 private TaskScheduler TaskScheduler { get; set; }
26  
5 office 27 private IManagedMqttClient Client { get; }
28  
29 private IMqttServer Server { get; }
30  
31 public bool Running { get; set; }
32  
33 public string Nick { get; set; }
34  
35 private IPAddress IPAddress { get; set; }
36  
37 private int Port { get; set; }
38  
6 office 39 private CancellationTokenSource CancellationTokenSource { get; set; }
40  
5 office 41 public MQTTCommunicationType Type { get; set; }
42  
43 public delegate void MessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e);
44  
45 public event MessageReceived OnMessageReceived;
46  
47 public delegate void ClientConnected(object sender, MqttClientConnectedEventArgs e);
48  
49 public event ClientConnected OnClientConnected;
50  
51 public delegate void ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e);
52  
53 public event ClientDisconnected OnClientDisconnected;
54  
55 public delegate void ClientConnectionFailed(object sender, MqttManagedProcessFailedEventArgs e);
56  
57 public event ClientConnectionFailed OnClientConnectionFailed;
58  
59 public delegate void ClientUnsubscribed(object sender, MqttClientUnsubscribedTopicEventArgs e);
60  
61 public event ClientUnsubscribed OnClientUnsubscribed;
62  
63 public delegate void ClientSubscribed(object sender, MqttClientSubscribedTopicEventArgs e);
64  
65 public event ClientSubscribed OnClientSubscribed;
66  
67 public delegate void ServerClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e);
68  
69 public event ServerClientDisconnected OnServerClientDisconnected;
70  
71 public delegate void ServerClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e);
72  
73 public event ServerClientConnected OnServerClientConnected;
74  
75 public delegate void ServerStarted(object sender, EventArgs e);
76  
77 public event ServerStarted OnServerStarted;
78  
79 public delegate void ServerStopped(object sender, EventArgs e);
80  
81 public event ServerStopped OnServerStopped;
82  
83 public async Task Start(MQTTCommunicationType type, IPAddress ipAddress, int port, string nick)
84 {
85 Type = type;
86 IPAddress = ipAddress;
87 Port = port;
88 Nick = nick;
89  
90 switch (type)
91 {
92 case MQTTCommunicationType.Client:
93 await StartClient().ConfigureAwait(false);
94 break;
95 case MQTTCommunicationType.Server:
96 await StartServer().ConfigureAwait(false);
97 break;
98 }
99 }
100  
101 private async Task StartClient()
102 {
103 var clientOptions = new MqttClientOptionsBuilder()
104 .WithTcpServer(IPAddress.ToString(), Port);
105  
106 // Setup and start a managed MQTT client.
107 var options = new ManagedMqttClientOptionsBuilder()
108 .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
109 .WithClientOptions(clientOptions.Build())
110 .Build();
111  
112 BindClientHandlers();
113  
114 await Client.SubscribeAsync(
115 new TopicFilterBuilder()
116 .WithTopic("lobby")
117 .Build()
118 ).ConfigureAwait(false);
119  
120 await Client.SubscribeAsync(
121 new TopicFilterBuilder()
122 .WithTopic("exchange")
123 .Build()
124 ).ConfigureAwait(false);
125  
126 await Client.StartAsync(options).ConfigureAwait(false);
127  
128 Running = true;
129 }
130  
131 private async Task StopClient()
132 {
133 UnbindClientHandlers();
134  
135 await Client.StopAsync().ConfigureAwait(false);
136 }
137  
138 public void BindClientHandlers()
139 {
140 Client.Connected += ClientOnConnected;
141 Client.Disconnected += ClientOnDisconnected;
142 Client.ConnectingFailed += ClientOnConnectingFailed;
143 Client.ApplicationMessageReceived += ClientOnApplicationMessageReceived;
144 }
145  
146 public void UnbindClientHandlers()
147 {
148 Client.Connected -= ClientOnConnected;
149 Client.Disconnected -= ClientOnDisconnected;
150 Client.ConnectingFailed -= ClientOnConnectingFailed;
151 Client.ApplicationMessageReceived -= ClientOnApplicationMessageReceived;
152 }
153  
6 office 154 private async void ClientOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
5 office 155 {
6 office 156 await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
157 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler);
5 office 158 }
159  
6 office 160 private async void ClientOnConnectingFailed(object sender, MqttManagedProcessFailedEventArgs e)
5 office 161 {
6 office 162 await Task.Delay(0).ContinueWith(_ => OnClientConnectionFailed?.Invoke(sender, e),
163 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 164 }
165  
6 office 166 private async void ClientOnDisconnected(object sender, MqttClientDisconnectedEventArgs e)
5 office 167 {
6 office 168 await Task.Delay(0).ContinueWith(_ => OnClientDisconnected?.Invoke(sender, e),
169 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 170 }
171  
6 office 172 private async void ClientOnConnected(object sender, MqttClientConnectedEventArgs e)
5 office 173 {
6 office 174 await Task.Delay(0).ContinueWith(_ => OnClientConnected?.Invoke(sender, e),
175 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 176 }
177  
178 private async Task StartServer()
179 {
180 var optionsBuilder = new MqttServerOptionsBuilder()
181 .WithDefaultEndpointBoundIPAddress(IPAddress)
182 .WithSubscriptionInterceptor(MQTTSubscriptionIntercept)
183 .WithDefaultEndpointPort(Port);
184  
185 BindServerHandlers();
186  
187 await Server.StartAsync(optionsBuilder.Build()).ConfigureAwait(false);
188  
189 Running = true;
190 }
191  
192 private async Task StopServer()
193 {
194 UnbindServerHandlers();
195  
196 await Server.StopAsync().ConfigureAwait(false);
197 }
198  
199 private void MQTTSubscriptionIntercept(MqttSubscriptionInterceptorContext context)
200 {
201 if (context.TopicFilter.Topic != "lobby" &&
202 context.TopicFilter.Topic != "exchange")
203 {
204 context.AcceptSubscription = false;
205 context.CloseConnection = true;
206 return;
207 }
208  
209 context.AcceptSubscription = true;
210 context.CloseConnection = false;
211 }
212  
213 private void BindServerHandlers()
214 {
215 Server.Started += ServerOnStarted;
216 Server.Stopped += ServerOnStopped;
217 Server.ClientConnected += ServerOnClientConnected;
218 Server.ClientDisconnected += ServerOnClientDisconnected;
219 Server.ClientSubscribedTopic += ServerOnClientSubscribedTopic;
220 Server.ClientUnsubscribedTopic += ServerOnClientUnsubscribedTopic;
221 Server.ApplicationMessageReceived += ServerOnApplicationMessageReceived;
222 }
223  
224 private void UnbindServerHandlers()
225 {
226 Server.Started -= ServerOnStarted;
227 Server.Stopped -= ServerOnStopped;
228 Server.ClientConnected -= ServerOnClientConnected;
229 Server.ClientDisconnected -= ServerOnClientDisconnected;
230 Server.ClientSubscribedTopic -= ServerOnClientSubscribedTopic;
231 Server.ClientUnsubscribedTopic -= ServerOnClientUnsubscribedTopic;
232 Server.ApplicationMessageReceived -= ServerOnApplicationMessageReceived;
233 }
234  
6 office 235 private async void ServerOnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
5 office 236 {
6 office 237 await Task.Delay(0).ContinueWith(_ => OnMessageReceived?.Invoke(sender, e),
238 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 239 }
240  
6 office 241 private async void ServerOnClientUnsubscribedTopic(object sender, MqttClientUnsubscribedTopicEventArgs e)
5 office 242 {
6 office 243 await Task.Delay(0).ContinueWith(_ => OnClientUnsubscribed?.Invoke(sender, e),
244 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 245 }
246  
6 office 247 private async void ServerOnClientSubscribedTopic(object sender, MqttClientSubscribedTopicEventArgs e)
5 office 248 {
6 office 249 await Task.Delay(0).ContinueWith(_ => OnClientSubscribed?.Invoke(sender, e),
250 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 251 }
252  
6 office 253 private async void ServerOnClientDisconnected(object sender, MQTTnet.Server.MqttClientDisconnectedEventArgs e)
5 office 254 {
6 office 255 await Task.Delay(0).ContinueWith(_ => OnServerClientDisconnected?.Invoke(sender, e),
256 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 257 }
258  
6 office 259 private async void ServerOnClientConnected(object sender, MQTTnet.Server.MqttClientConnectedEventArgs e)
5 office 260 {
6 office 261 await Task.Delay(0).ContinueWith(_ => OnServerClientConnected?.Invoke(sender, e),
262 CancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler).ConfigureAwait(false);
5 office 263 }
264  
265 private void ServerOnStopped(object sender, EventArgs e)
266 {
267 OnServerStopped?.Invoke(sender, e);
268 }
269  
270 private void ServerOnStarted(object sender, EventArgs e)
271 {
272 OnServerStarted?.Invoke(sender, e);
273 }
274  
275 public async Task Stop()
276 {
277 switch (Type)
278 {
279 case MQTTCommunicationType.Server:
280 await StopServer().ConfigureAwait(false);
281 break;
282 case MQTTCommunicationType.Client:
283 await StopClient().ConfigureAwait(false);
284 break;
285 }
286  
287 Running = false;
288 }
289  
290 public async Task Broadcast(string topic, byte[] payload)
291 {
292 switch (Type)
293 {
294 case MQTTCommunicationType.Client:
295 await Client.PublishAsync(new ManagedMqttApplicationMessage
296 {
297 ApplicationMessage = new MqttApplicationMessage { Topic = topic, Payload = payload }
298 }).ConfigureAwait(false);
299 break;
300 case MQTTCommunicationType.Server:
301 await Server.PublishAsync(new MqttApplicationMessage {Topic = topic, Payload = payload}).ConfigureAwait(false);
302 break;
303 }
304 }
305 }
306 }