Zzz – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel;
4 using System.IO;
5 using System.Text;
6 using System.Threading;
7 using System.Threading.Tasks;
8 using MQTTnet;
9 using MQTTnet.Client;
10 using MQTTnet.Client.Connecting;
11 using MQTTnet.Client.Disconnecting;
12 using MQTTnet.Client.Options;
13 using MQTTnet.Client.Receiving;
14 using MQTTnet.Client.Subscribing;
15 using Newtonsoft.Json;
16 using Serilog;
17 using Zzz.Action;
18 using Zzz.Properties;
19 using Zzz.State;
20  
21 namespace Zzz.Clients
22 {
23 public class MqttClient : IDisposable
24 {
25 #region Static Fields and Constants
26  
27 private static IMqttClient _mqttClient;
28  
29 private static CancellationTokenSource _mqttCancellationTokenSource;
30  
31 private static CancellationToken _mqttCancellationToken;
32  
33 #endregion
34  
35 #region Public Events & Delegates
36  
37 public event EventHandler<MqttClientSubscribeResultCode> MqttSubscribeSucceeded;
38  
39 public event EventHandler<MqttClientSubscribeResultCode> MqttSubscribeFailed;
40  
41 public event EventHandler MqttConnectionSucceeded;
42  
43 public event EventHandler MqttConnectionFailed;
44  
45 public event EventHandler MqttDisconnected;
46  
47 public event EventHandler<MqttStateReceivedEventArgs> MqttStateReceived;
48  
49 public event EventHandler<MqttActionReceivedEventArgs> MqttActionReceived;
50  
51 #endregion
52  
53 #region Public Enums, Properties and Fields
54  
55 public bool Connected { get; set; }
56  
57 public bool Subscribed { get; set; }
58  
59 #endregion
60  
61 #region Private Delegates, Events, Enums, Properties, Indexers and Fields
62  
63 private IMqttClientOptions _iMqttClient;
64 private Configuration.Configuration _configuration;
65  
66 #endregion
67  
68 #region Constructors, Destructors and Finalizers
69  
70 public MqttClient(Configuration.Configuration configuration) : this()
71 {
72 _configuration = configuration;
73 }
74  
75 private MqttClient()
76 {
77 MqttSubscribeSucceeded += MqttClient_MqttSubscribeSucceeded;
78 MqttSubscribeFailed += MqttClient_MqttSubscribeFailed;
79 MqttConnectionSucceeded += MqttClient_MqttConnectionSucceeded;
80 MqttConnectionFailed += MqttClient_MqttConnectionFailed;
81 MqttDisconnected += MqttClient_MqttDisconnected;
82 }
83  
84 public void Dispose()
85 {
86 _mqttCancellationTokenSource?.Cancel();
87 _mqttCancellationTokenSource = null;
88  
89 _mqttClient?.Dispose();
90 _mqttClient = null;
91 }
92  
93 #endregion
94  
95 #region Event Handlers
96  
97 private void MqttClient_MqttDisconnected(object sender, EventArgs e)
98 {
99 Connected = false;
100 Subscribed = false;
101 }
102  
103 private void MqttClient_MqttConnectionFailed(object sender, EventArgs e)
104 {
105 Connected = false;
106 Subscribed = false;
107 }
108  
109 private void MqttClient_MqttConnectionSucceeded(object sender, EventArgs e)
110 {
111 Connected = true;
112 }
113  
114 private void MqttClient_MqttSubscribeFailed(object sender, MqttClientSubscribeResultCode e)
115 {
116 Subscribed = false;
117 }
118  
119 private void MqttClient_MqttSubscribeSucceeded(object sender, MqttClientSubscribeResultCode e)
120 {
121 Subscribed = true;
122 }
123  
124 #endregion
125  
126 #region Public Methods
127  
128 public async Task Publish(Action.Action action)
129 {
130 try
131 {
132 if (_mqttClient == null || !_mqttClient.IsConnected)
133 {
134 return;
135 }
136  
137 var message = new MqttApplicationMessage();
138  
139 var payload = JsonConvert.SerializeObject(new ZzzAction(action));
140  
141 using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload)))
142 {
143 message.Payload = memoryStream.ToArray();
144 message.Topic = _configuration.MqttTopic;
145  
146 await _mqttClient.PublishAsync(message, _mqttCancellationToken);
147 }
148 }
149 catch (Exception ex)
150 {
151 Log.Warning(ex, "Unable to publish sleep event to MQTT broker.");
152 }
153 }
154  
155 public async Task Start()
156 {
157 _mqttCancellationTokenSource = new CancellationTokenSource();
158 _mqttCancellationToken = _mqttCancellationTokenSource.Token;
159  
160 _mqttClient = new MqttFactory().CreateMqttClient();
161 _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedHandler);
162 _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(MqttClientConnectedHandler);
163 _mqttClient.ApplicationMessageReceivedHandler =
164 new MqttApplicationMessageReceivedHandlerDelegate(MqttApplicationMessageReceivedHandler);
165  
166 _iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort,
167 _configuration.MqttUsername, _configuration.MqttPassword);
168  
169 try
170 {
171 await _mqttClient.ConnectAsync(_iMqttClient, _mqttCancellationToken);
172 }
173 catch(Exception exception)
174 {
175 Log.Information(exception, "MQTT connection has been cancelled.");
176 }
177 }
178  
179 public async Task Stop()
180 {
181 if (_mqttCancellationTokenSource != null)
182 {
183 _mqttCancellationTokenSource.Cancel();
184  
185 if (_mqttClient != null && _mqttClient.IsConnected)
186 {
187 await _mqttClient.DisconnectAsync();
188  
189 MqttDisconnected?.Invoke(this, EventArgs.Empty);
190  
191 _mqttClient?.Dispose();
192 _mqttClient = null;
193 }
194  
195 _mqttCancellationTokenSource = null;
196 }
197 }
198  
199 #endregion
200  
201 #region Private Methods
202  
203 public async Task Restart()
204 {
205 if (!_configuration.MqttEnable)
206 {
207 await Stop();
208  
209 return;
210 }
211  
212 await Stop().ContinueWith(async _ => await Start());
213 }
214  
215 private void MqttApplicationMessageReceivedHandler(MqttApplicationMessageReceivedEventArgs arg)
216 {
217 // Do not bother with MQTT clients that do not set a client ID.
218 if (string.IsNullOrEmpty(arg.ClientId))
219 {
220 return;
221 }
222  
223 var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
224  
225 Log.Information($"Message from MQTT broker received: {payload}");
226  
227 // Only listen on configured MQTT topic.
228 var topic = arg.ApplicationMessage.Topic.Split('/');
229  
230 var messageTopic = new string[topic.Length - 1];
231 Array.Copy(topic, messageTopic, topic.Length - 1);
232  
233 // Only listen on configured topic.
234 if (string.Join("/", messageTopic) != _configuration.MqttTopic)
235 {
236 return;
237 }
238  
239 var zzzTrigger = topic[topic.Length - 1].ToUpperInvariant();
240  
241 switch (zzzTrigger)
242 {
243 case "ACTION":
244 try
245 {
246 var message = (ZzzAction) JsonConvert.DeserializeObject(payload, typeof(ZzzAction));
247  
248 if (message == null)
249 {
250 throw new ArgumentException("Invalid action received from MQTT broker.");
251 }
252  
253 MqttActionReceived?.Invoke(this, new MqttActionReceivedEventArgs(message));
254 }
255 catch (Exception ex)
256 {
257 Log.Warning(ex, "Unable to decode action change MQTT request.");
258 }
259  
260 break;
261 case "STATE":
262 try
263 {
264 var message = (ZzzState) JsonConvert.DeserializeObject(payload, typeof(ZzzState));
265  
266 if (message == null)
267 {
268 throw new ArgumentException("Invalid state received from MQTT broker.");
269 }
270  
271 MqttStateReceived?.Invoke(this, new MqttStateReceivedEventArgs(message));
272 }
273 catch (Exception ex)
274 {
275 Log.Warning(ex, "Unable to decode state change MQTT request.");
276 }
277  
278 break;
279 }
280 }
281  
282 private async Task MqttClientConnectedHandler(MqttClientConnectedEventArgs args)
283 {
284 if (!_configuration.MqttEnable)
285 {
286 return;
287 }
288  
289 MqttConnectionSucceeded?.Invoke(this, EventArgs.Empty);
290  
291 try
292 {
293 var subscribe = await _mqttClient.SubscribeAsync(
294 new MqttClientSubscribeOptions
295 {
296 TopicFilters = new List<MqttTopicFilter>
297 {
298 new MqttTopicFilter {Topic = $"{_configuration.MqttTopic}/Action"},
299 new MqttTopicFilter {Topic = $"{_configuration.MqttTopic}/State"}
300 }
301 }, _mqttCancellationToken);
302  
303 foreach (var sub in subscribe.Items)
304 {
305 switch (sub.ResultCode)
306 {
307 case MqttClientSubscribeResultCode.GrantedQoS0:
308 case MqttClientSubscribeResultCode.GrantedQoS1:
309 case MqttClientSubscribeResultCode.GrantedQoS2:
310 MqttSubscribeSucceeded?.Invoke(this, sub.ResultCode);
311 break;
312  
313 default:
314 MqttSubscribeFailed?.Invoke(this, sub.ResultCode);
315 break;
316 }
317 }
318 }
319 catch
320 {
321 MqttConnectionFailed?.Invoke(this, EventArgs.Empty);
322 }
323 }
324  
325 private async Task MqttClientDisconnectedHandler(MqttClientDisconnectedEventArgs args)
326 {
327 if (!_configuration.MqttEnable)
328 {
329 return;
330 }
331  
332 MqttDisconnected?.Invoke(this, EventArgs.Empty);
333  
334 try
335 {
336 await Task.Delay(TimeSpan.FromSeconds(1), _mqttCancellationToken);
337  
338 if (_mqttClient.IsConnected)
339 {
340 return;
341 }
342  
343 await _mqttClient.ConnectAsync(
344 BuildMqttClient(_configuration.MqttServer, (int)_configuration.MqttPort,
345 _configuration.MqttUsername,
346 _configuration.MqttPassword), _mqttCancellationToken);
347 }
348 catch (Exception ex)
349 {
350 Log.Warning(ex, "Disconnected from MQTT server.");
351 }
352 }
353  
354 private static IMqttClientOptions BuildMqttClient(string server, int port, string username, string password)
355 {
356 return new MqttClientOptionsBuilder()
357 .WithCommunicationTimeout(TimeSpan.FromMinutes(1))
358 .WithTcpServer(server, port)
359 .WithCredentials(username, password)
360 .Build();
361 }
362  
363 #endregion
364 }
365 }