Zzz – Diff between revs 1 and 5
?pathlinks?
Rev 1 | Rev 5 | |||
---|---|---|---|---|
Line 5... | Line 5... | |||
5 | using System.Text; |
5 | using System.Text; |
|
6 | using System.Threading; |
6 | using System.Threading; |
|
7 | using System.Threading.Tasks; |
7 | using System.Threading.Tasks; |
|
8 | using MQTTnet; |
8 | using MQTTnet; |
|
9 | using MQTTnet.Client; |
9 | using MQTTnet.Client; |
|
10 | using MQTTnet.Client.Connecting; |
- | ||
11 | using MQTTnet.Client.Disconnecting; |
- | ||
12 | using MQTTnet.Client.Options; |
10 | using MQTTnet.Packets; |
|
13 | using MQTTnet.Client.Receiving; |
- | ||
14 | using MQTTnet.Client.Subscribing; |
- | ||
15 | using Newtonsoft.Json; |
11 | using Newtonsoft.Json; |
|
16 | using Serilog; |
12 | using Serilog; |
|
17 | using Zzz.Action; |
13 | using Zzz.Action; |
|
18 | using Zzz.Properties; |
14 | using Zzz.Properties; |
|
19 | using Zzz.State; |
15 | using Zzz.State; |
|
Line 58... | Line 54... | |||
58 | |
54 | |
|
Line 59... | Line 55... | |||
59 | #endregion |
55 | #endregion |
|
Line 60... | Line 56... | |||
60 | |
56 | |
|
61 | #region Private Delegates, Events, Enums, Properties, Indexers and Fields |
57 | #region Private Delegates, Events, Enums, Properties, Indexers and Fields |
|
Line 62... | Line 58... | |||
62 | |
58 | |
|
Line 63... | Line 59... | |||
63 | private IMqttClientOptions _iMqttClient; |
59 | private MqttClientOptions _iMqttClient; |
|
Line 138... | Line 134... | |||
138 | |
134 | |
|
Line 139... | Line 135... | |||
139 | var payload = JsonConvert.SerializeObject(new ZzzAction(action)); |
135 | var payload = JsonConvert.SerializeObject(new ZzzAction(action)); |
|
140 | |
136 | |
|
141 | using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload))) |
137 | using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload))) |
|
142 | { |
138 | { |
|
Line 143... | Line 139... | |||
143 | message.Payload = memoryStream.ToArray(); |
139 | message.PayloadSegment = new ArraySegment<byte>(memoryStream.ToArray()); |
|
144 | message.Topic = _configuration.MqttTopic; |
140 | message.Topic = _configuration.MqttTopic; |
|
145 | |
141 | |
|
Line 156... | Line 152... | |||
156 | { |
152 | { |
|
157 | _mqttCancellationTokenSource = new CancellationTokenSource(); |
153 | _mqttCancellationTokenSource = new CancellationTokenSource(); |
|
158 | _mqttCancellationToken = _mqttCancellationTokenSource.Token; |
154 | _mqttCancellationToken = _mqttCancellationTokenSource.Token; |
|
Line 159... | Line 155... | |||
159 | |
155 | |
|
160 | _mqttClient = new MqttFactory().CreateMqttClient(); |
156 | _mqttClient = new MqttFactory().CreateMqttClient(); |
|
161 | _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedHandler); |
157 | _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; |
|
162 | _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(MqttClientConnectedHandler); |
- | ||
163 | _mqttClient.ApplicationMessageReceivedHandler = |
158 | _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; |
|
Line 164... | Line 159... | |||
164 | new MqttApplicationMessageReceivedHandlerDelegate(MqttApplicationMessageReceivedHandler); |
159 | _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; |
|
165 | |
160 | |
|
Line 166... | Line 161... | |||
166 | _iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort, |
161 | _iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort, |
|
Line 174... | Line 169... | |||
174 | { |
169 | { |
|
175 | Log.Information(exception, "MQTT connection has been cancelled."); |
170 | Log.Information(exception, "MQTT connection has been cancelled."); |
|
176 | } |
171 | } |
|
177 | } |
172 | } |
|
Line 178... | Line -... | |||
178 | |
- | ||
179 | public async Task Stop() |
- | ||
180 | { |
- | ||
181 | if (_mqttCancellationTokenSource != null) |
- | ||
182 | { |
- | ||
183 | _mqttCancellationTokenSource.Cancel(); |
- | ||
184 | |
- | ||
185 | if (_mqttClient != null && _mqttClient.IsConnected) |
- | ||
186 | { |
- | ||
187 | await _mqttClient.DisconnectAsync(); |
- | ||
188 | |
- | ||
189 | MqttDisconnected?.Invoke(this, EventArgs.Empty); |
- | ||
190 | |
- | ||
191 | _mqttClient?.Dispose(); |
- | ||
192 | _mqttClient = null; |
- | ||
193 | } |
- | ||
194 | |
- | ||
195 | _mqttCancellationTokenSource = null; |
- | ||
196 | } |
- | ||
197 | } |
- | ||
198 | |
- | ||
199 | #endregion |
- | ||
200 | |
- | ||
201 | #region Private Methods |
- | ||
202 | |
- | ||
203 | public async Task Restart() |
- | ||
204 | { |
- | ||
205 | if (!_configuration.MqttEnable) |
- | ||
206 | { |
- | ||
207 | await Stop(); |
- | ||
208 | |
- | ||
209 | return; |
- | ||
210 | } |
- | ||
211 | |
- | ||
212 | await Stop().ContinueWith(async _ => await Start()); |
- | ||
213 | } |
- | ||
214 | |
173 | |
|
215 | private void MqttApplicationMessageReceivedHandler(MqttApplicationMessageReceivedEventArgs arg) |
174 | private async Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) |
|
216 | { |
175 | { |
|
217 | // Do not bother with MQTT clients that do not set a client ID. |
176 | // Do not bother with MQTT clients that do not set a client ID. |
|
218 | if (string.IsNullOrEmpty(arg.ClientId)) |
177 | if (string.IsNullOrEmpty(arg.ClientId)) |
|
219 | { |
178 | { |
|
220 | return; |
179 | return; |
|
Line 221... | Line 180... | |||
221 | } |
180 | } |
|
Line 222... | Line 181... | |||
222 | |
181 | |
|
Line 223... | Line 182... | |||
223 | var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload); |
182 | var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array ?? Array.Empty<byte>()); |
|
224 | |
183 | |
|
Line 241... | Line 200... | |||
241 | switch (zzzTrigger) |
200 | switch (zzzTrigger) |
|
242 | { |
201 | { |
|
243 | case "ACTION": |
202 | case "ACTION": |
|
244 | try |
203 | try |
|
245 | { |
204 | { |
|
246 | var message = (ZzzAction) JsonConvert.DeserializeObject(payload, typeof(ZzzAction)); |
205 | var message = (ZzzAction)JsonConvert.DeserializeObject(payload, typeof(ZzzAction)); |
|
Line 247... | Line 206... | |||
247 | |
206 | |
|
248 | if (message == null) |
207 | if (message == null) |
|
249 | { |
208 | { |
|
250 | throw new ArgumentException("Invalid action received from MQTT broker."); |
209 | throw new ArgumentException("Invalid action received from MQTT broker."); |
|
Line 259... | Line 218... | |||
259 | |
218 | |
|
260 | break; |
219 | break; |
|
261 | case "STATE": |
220 | case "STATE": |
|
262 | try |
221 | try |
|
263 | { |
222 | { |
|
Line 264... | Line 223... | |||
264 | var message = (ZzzState) JsonConvert.DeserializeObject(payload, typeof(ZzzState)); |
223 | var message = (ZzzState)JsonConvert.DeserializeObject(payload, typeof(ZzzState)); |
|
265 | |
224 | |
|
266 | if (message == null) |
225 | if (message == null) |
|
267 | { |
226 | { |
|
Line 277... | Line 236... | |||
277 | |
236 | |
|
278 | break; |
237 | break; |
|
279 | } |
238 | } |
|
Line 280... | Line 239... | |||
280 | } |
239 | } |
|
281 | |
240 | |
|
282 | private async Task MqttClientConnectedHandler(MqttClientConnectedEventArgs args) |
241 | private async Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) |
|
283 | { |
242 | { |
|
284 | if (!_configuration.MqttEnable) |
243 | if (!_configuration.MqttEnable) |
|
285 | { |
244 | { |
|
Line 320... | Line 279... | |||
320 | { |
279 | { |
|
321 | MqttConnectionFailed?.Invoke(this, EventArgs.Empty); |
280 | MqttConnectionFailed?.Invoke(this, EventArgs.Empty); |
|
322 | } |
281 | } |
|
323 | } |
282 | } |
|
Line 324... | Line 283... | |||
324 | |
283 | |
|
325 | private async Task MqttClientDisconnectedHandler(MqttClientDisconnectedEventArgs args) |
284 | private async Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) |
|
326 | { |
285 | { |
|
327 | if (!_configuration.MqttEnable) |
286 | if (!_configuration.MqttEnable) |
|
328 | { |
287 | { |
|
329 | return; |
288 | return; |
|
Line 349... | Line 308... | |||
349 | { |
308 | { |
|
350 | Log.Warning(ex, "Disconnected from MQTT server."); |
309 | Log.Warning(ex, "Disconnected from MQTT server."); |
|
351 | } |
310 | } |
|
352 | } |
311 | } |
|
Line -... | Line 312... | |||
- | 312 | |
||
- | 313 | public async Task Stop() |
||
- | 314 | { |
||
- | 315 | if (_mqttCancellationTokenSource != null) |
||
- | 316 | { |
||
- | 317 | _mqttCancellationTokenSource.Cancel(); |
||
- | 318 | |
||
- | 319 | if (_mqttClient != null && _mqttClient.IsConnected) |
||
- | 320 | { |
||
- | 321 | await _mqttClient.DisconnectAsync(); |
||
- | 322 | |
||
- | 323 | MqttDisconnected?.Invoke(this, EventArgs.Empty); |
||
- | 324 | |
||
- | 325 | _mqttClient?.Dispose(); |
||
- | 326 | _mqttClient = null; |
||
- | 327 | } |
||
- | 328 | |
||
- | 329 | _mqttCancellationTokenSource = null; |
||
- | 330 | } |
||
- | 331 | } |
||
- | 332 | |
||
- | 333 | #endregion |
||
- | 334 | |
||
- | 335 | #region Private Methods |
||
- | 336 | |
||
- | 337 | public async Task Restart() |
||
- | 338 | { |
||
- | 339 | if (!_configuration.MqttEnable) |
||
- | 340 | { |
||
- | 341 | await Stop(); |
||
- | 342 | |
||
- | 343 | return; |
||
- | 344 | } |
||
- | 345 | |
||
- | 346 | await Stop().ContinueWith(async _ => await Start()); |
||
- | 347 | } |
||
353 | |
348 | |
|
354 | private static IMqttClientOptions BuildMqttClient(string server, int port, string username, string password) |
349 | private static MqttClientOptions BuildMqttClient(string server, int port, string username, string password) |
|
355 | { |
350 | { |
|
356 | return new MqttClientOptionsBuilder() |
351 | return new MqttClientOptionsBuilder() |
|
357 | .WithCommunicationTimeout(TimeSpan.FromMinutes(1)) |
352 | .WithTimeout(TimeSpan.FromMinutes(1)) |
|
358 | .WithTcpServer(server, port) |
353 | .WithTcpServer(server, port) |
|
359 | .WithCredentials(username, password) |
354 | .WithCredentials(username, password) |
|
360 | .Build(); |
355 | .Build(); |