Zzz – Blame information for rev 5

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel;
4 using System.IO;
5 using System.Text;
6 using System.Threading;
7 using System.Threading.Tasks;
8 using MQTTnet;
9 using MQTTnet.Client;
5 office 10 using MQTTnet.Packets;
1 office 11 using Newtonsoft.Json;
12 using Serilog;
13 using Zzz.Action;
14 using Zzz.Properties;
15 using Zzz.State;
16  
17 namespace Zzz.Clients
18 {
19 public class MqttClient : IDisposable
20 {
21 #region Static Fields and Constants
22  
23 private static IMqttClient _mqttClient;
24  
25 private static CancellationTokenSource _mqttCancellationTokenSource;
26  
27 private static CancellationToken _mqttCancellationToken;
28  
29 #endregion
30  
31 #region Public Events & Delegates
32  
33 public event EventHandler<MqttClientSubscribeResultCode> MqttSubscribeSucceeded;
34  
35 public event EventHandler<MqttClientSubscribeResultCode> MqttSubscribeFailed;
36  
37 public event EventHandler MqttConnectionSucceeded;
38  
39 public event EventHandler MqttConnectionFailed;
40  
41 public event EventHandler MqttDisconnected;
42  
43 public event EventHandler<MqttStateReceivedEventArgs> MqttStateReceived;
44  
45 public event EventHandler<MqttActionReceivedEventArgs> MqttActionReceived;
46  
47 #endregion
48  
49 #region Public Enums, Properties and Fields
50  
51 public bool Connected { get; set; }
52  
53 public bool Subscribed { get; set; }
54  
55 #endregion
56  
57 #region Private Delegates, Events, Enums, Properties, Indexers and Fields
58  
5 office 59 private MqttClientOptions _iMqttClient;
60 private readonly Configuration.Configuration _configuration;
1 office 61  
62 #endregion
63  
64 #region Constructors, Destructors and Finalizers
65  
66 public MqttClient(Configuration.Configuration configuration) : this()
67 {
68 _configuration = configuration;
69 }
70  
71 private MqttClient()
72 {
73 MqttSubscribeSucceeded += MqttClient_MqttSubscribeSucceeded;
74 MqttSubscribeFailed += MqttClient_MqttSubscribeFailed;
75 MqttConnectionSucceeded += MqttClient_MqttConnectionSucceeded;
76 MqttConnectionFailed += MqttClient_MqttConnectionFailed;
77 MqttDisconnected += MqttClient_MqttDisconnected;
78 }
79  
80 public void Dispose()
81 {
82 _mqttCancellationTokenSource?.Cancel();
83 _mqttCancellationTokenSource = null;
84  
85 _mqttClient?.Dispose();
86 _mqttClient = null;
87 }
88  
89 #endregion
90  
91 #region Event Handlers
92  
93 private void MqttClient_MqttDisconnected(object sender, EventArgs e)
94 {
95 Connected = false;
96 Subscribed = false;
97 }
98  
99 private void MqttClient_MqttConnectionFailed(object sender, EventArgs e)
100 {
101 Connected = false;
102 Subscribed = false;
103 }
104  
105 private void MqttClient_MqttConnectionSucceeded(object sender, EventArgs e)
106 {
107 Connected = true;
108 }
109  
110 private void MqttClient_MqttSubscribeFailed(object sender, MqttClientSubscribeResultCode e)
111 {
112 Subscribed = false;
113 }
114  
115 private void MqttClient_MqttSubscribeSucceeded(object sender, MqttClientSubscribeResultCode e)
116 {
117 Subscribed = true;
118 }
119  
120 #endregion
121  
122 #region Public Methods
123  
124 public async Task Publish(Action.Action action)
125 {
126 try
127 {
128 if (_mqttClient == null || !_mqttClient.IsConnected)
129 {
130 return;
131 }
132  
133 var message = new MqttApplicationMessage();
134  
135 var payload = JsonConvert.SerializeObject(new ZzzAction(action));
136  
137 using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload)))
138 {
5 office 139 message.PayloadSegment = new ArraySegment<byte>(memoryStream.ToArray());
1 office 140 message.Topic = _configuration.MqttTopic;
141  
142 await _mqttClient.PublishAsync(message, _mqttCancellationToken);
143 }
144 }
145 catch (Exception ex)
146 {
147 Log.Warning(ex, "Unable to publish sleep event to MQTT broker.");
148 }
149 }
150  
151 public async Task Start()
152 {
153 _mqttCancellationTokenSource = new CancellationTokenSource();
154 _mqttCancellationToken = _mqttCancellationTokenSource.Token;
155  
156 _mqttClient = new MqttFactory().CreateMqttClient();
5 office 157 _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
158 _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
159 _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;
1 office 160  
161 _iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort,
162 _configuration.MqttUsername, _configuration.MqttPassword);
163  
164 try
165 {
166 await _mqttClient.ConnectAsync(_iMqttClient, _mqttCancellationToken);
167 }
168 catch(Exception exception)
169 {
170 Log.Information(exception, "MQTT connection has been cancelled.");
171 }
172 }
173  
5 office 174 private async Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
1 office 175 {
176 // Do not bother with MQTT clients that do not set a client ID.
177 if (string.IsNullOrEmpty(arg.ClientId))
178 {
179 return;
180 }
181  
5 office 182 var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array ?? Array.Empty<byte>());
1 office 183  
184 Log.Information($"Message from MQTT broker received: {payload}");
185  
186 // Only listen on configured MQTT topic.
187 var topic = arg.ApplicationMessage.Topic.Split('/');
188  
189 var messageTopic = new string[topic.Length - 1];
190 Array.Copy(topic, messageTopic, topic.Length - 1);
191  
192 // Only listen on configured topic.
193 if (string.Join("/", messageTopic) != _configuration.MqttTopic)
194 {
195 return;
196 }
197  
198 var zzzTrigger = topic[topic.Length - 1].ToUpperInvariant();
199  
200 switch (zzzTrigger)
201 {
202 case "ACTION":
203 try
204 {
5 office 205 var message = (ZzzAction)JsonConvert.DeserializeObject(payload, typeof(ZzzAction));
1 office 206  
207 if (message == null)
208 {
209 throw new ArgumentException("Invalid action received from MQTT broker.");
210 }
211  
212 MqttActionReceived?.Invoke(this, new MqttActionReceivedEventArgs(message));
213 }
214 catch (Exception ex)
215 {
216 Log.Warning(ex, "Unable to decode action change MQTT request.");
217 }
218  
219 break;
220 case "STATE":
221 try
222 {
5 office 223 var message = (ZzzState)JsonConvert.DeserializeObject(payload, typeof(ZzzState));
1 office 224  
225 if (message == null)
226 {
227 throw new ArgumentException("Invalid state received from MQTT broker.");
228 }
229  
230 MqttStateReceived?.Invoke(this, new MqttStateReceivedEventArgs(message));
231 }
232 catch (Exception ex)
233 {
234 Log.Warning(ex, "Unable to decode state change MQTT request.");
235 }
236  
237 break;
238 }
239 }
240  
5 office 241 private async Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
1 office 242 {
243 if (!_configuration.MqttEnable)
244 {
245 return;
246 }
247  
248 MqttConnectionSucceeded?.Invoke(this, EventArgs.Empty);
249  
250 try
251 {
252 var subscribe = await _mqttClient.SubscribeAsync(
253 new MqttClientSubscribeOptions
254 {
255 TopicFilters = new List<MqttTopicFilter>
256 {
257 new MqttTopicFilter {Topic = $"{_configuration.MqttTopic}/Action"},
258 new MqttTopicFilter {Topic = $"{_configuration.MqttTopic}/State"}
259 }
260 }, _mqttCancellationToken);
261  
262 foreach (var sub in subscribe.Items)
263 {
264 switch (sub.ResultCode)
265 {
266 case MqttClientSubscribeResultCode.GrantedQoS0:
267 case MqttClientSubscribeResultCode.GrantedQoS1:
268 case MqttClientSubscribeResultCode.GrantedQoS2:
269 MqttSubscribeSucceeded?.Invoke(this, sub.ResultCode);
270 break;
271  
272 default:
273 MqttSubscribeFailed?.Invoke(this, sub.ResultCode);
274 break;
275 }
276 }
277 }
278 catch
279 {
280 MqttConnectionFailed?.Invoke(this, EventArgs.Empty);
281 }
282 }
283  
5 office 284 private async Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
1 office 285 {
286 if (!_configuration.MqttEnable)
287 {
288 return;
289 }
290  
291 MqttDisconnected?.Invoke(this, EventArgs.Empty);
292  
293 try
294 {
295 await Task.Delay(TimeSpan.FromSeconds(1), _mqttCancellationToken);
296  
297 if (_mqttClient.IsConnected)
298 {
299 return;
300 }
301  
302 await _mqttClient.ConnectAsync(
303 BuildMqttClient(_configuration.MqttServer, (int)_configuration.MqttPort,
304 _configuration.MqttUsername,
305 _configuration.MqttPassword), _mqttCancellationToken);
306 }
307 catch (Exception ex)
308 {
309 Log.Warning(ex, "Disconnected from MQTT server.");
310 }
311 }
312  
5 office 313 public async Task Stop()
1 office 314 {
5 office 315 if (_mqttCancellationTokenSource != null)
316 {
317 _mqttCancellationTokenSource.Cancel();
318  
319 if (_mqttClient != null && _mqttClient.IsConnected)
320 {
321 await _mqttClient.DisconnectAsync();
322  
323 MqttDisconnected?.Invoke(this, EventArgs.Empty);
324  
325 _mqttClient?.Dispose();
326 _mqttClient = null;
327 }
328  
329 _mqttCancellationTokenSource = null;
330 }
331 }
332  
333 #endregion
334  
335 #region Private Methods
336  
337 public async Task Restart()
338 {
339 if (!_configuration.MqttEnable)
340 {
341 await Stop();
342  
343 return;
344 }
345  
346 await Stop().ContinueWith(async _ => await Start());
347 }
348  
349 private static MqttClientOptions BuildMqttClient(string server, int port, string username, string password)
350 {
1 office 351 return new MqttClientOptionsBuilder()
5 office 352 .WithTimeout(TimeSpan.FromMinutes(1))
1 office 353 .WithTcpServer(server, port)
354 .WithCredentials(username, password)
355 .Build();
356 }
357  
358 #endregion
359 }
360 }