Zzz – Diff between revs 1 and 5

Subversion Repositories:
Rev:
Show entire fileIgnore whitespace
Rev 1 Rev 5
Line 5... Line 5...
5 using System.Text; 5 using System.Text;
6 using System.Threading; 6 using System.Threading;
7 using System.Threading.Tasks; 7 using System.Threading.Tasks;
8 using MQTTnet; 8 using MQTTnet;
9 using MQTTnet.Client; 9 using MQTTnet.Client;
10 using MQTTnet.Client.Connecting; -  
11 using MQTTnet.Client.Disconnecting; -  
12 using MQTTnet.Client.Options; 10 using MQTTnet.Packets;
13 using MQTTnet.Client.Receiving; -  
14 using MQTTnet.Client.Subscribing; -  
15 using Newtonsoft.Json; 11 using Newtonsoft.Json;
16 using Serilog; 12 using Serilog;
17 using Zzz.Action; 13 using Zzz.Action;
18 using Zzz.Properties; 14 using Zzz.Properties;
19 using Zzz.State; 15 using Zzz.State;
Line 58... Line 54...
58   54  
Line 59... Line 55...
59 #endregion 55 #endregion
Line 60... Line 56...
60   56  
61 #region Private Delegates, Events, Enums, Properties, Indexers and Fields 57 #region Private Delegates, Events, Enums, Properties, Indexers and Fields
Line 62... Line 58...
62   58  
Line 63... Line 59...
63 private IMqttClientOptions _iMqttClient; 59 private MqttClientOptions _iMqttClient;
Line 138... Line 134...
138   134  
Line 139... Line 135...
139 var payload = JsonConvert.SerializeObject(new ZzzAction(action)); 135 var payload = JsonConvert.SerializeObject(new ZzzAction(action));
140   136  
141 using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload))) 137 using (var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(payload)))
142 { 138 {
Line 143... Line 139...
143 message.Payload = memoryStream.ToArray(); 139 message.PayloadSegment = new ArraySegment<byte>(memoryStream.ToArray());
144 message.Topic = _configuration.MqttTopic; 140 message.Topic = _configuration.MqttTopic;
145   141  
Line 156... Line 152...
156 { 152 {
157 _mqttCancellationTokenSource = new CancellationTokenSource(); 153 _mqttCancellationTokenSource = new CancellationTokenSource();
158 _mqttCancellationToken = _mqttCancellationTokenSource.Token; 154 _mqttCancellationToken = _mqttCancellationTokenSource.Token;
Line 159... Line 155...
159   155  
160 _mqttClient = new MqttFactory().CreateMqttClient(); 156 _mqttClient = new MqttFactory().CreateMqttClient();
161 _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedHandler); 157 _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
162 _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(MqttClientConnectedHandler); -  
163 _mqttClient.ApplicationMessageReceivedHandler = 158 _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
Line 164... Line 159...
164 new MqttApplicationMessageReceivedHandlerDelegate(MqttApplicationMessageReceivedHandler); 159 _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;
165   160  
Line 166... Line 161...
166 _iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort, 161 _iMqttClient = BuildMqttClient(_configuration.MqttServer, (int) _configuration.MqttPort,
Line 174... Line 169...
174 { 169 {
175 Log.Information(exception, "MQTT connection has been cancelled."); 170 Log.Information(exception, "MQTT connection has been cancelled.");
176 } 171 }
177 } 172 }
Line 178... Line -...
178   -  
179 public async Task Stop() -  
180 { -  
181 if (_mqttCancellationTokenSource != null) -  
182 { -  
183 _mqttCancellationTokenSource.Cancel(); -  
184   -  
185 if (_mqttClient != null && _mqttClient.IsConnected) -  
186 { -  
187 await _mqttClient.DisconnectAsync(); -  
188   -  
189 MqttDisconnected?.Invoke(this, EventArgs.Empty); -  
190   -  
191 _mqttClient?.Dispose(); -  
192 _mqttClient = null; -  
193 } -  
194   -  
195 _mqttCancellationTokenSource = null; -  
196 } -  
197 } -  
198   -  
199 #endregion -  
200   -  
201 #region Private Methods -  
202   -  
203 public async Task Restart() -  
204 { -  
205 if (!_configuration.MqttEnable) -  
206 { -  
207 await Stop(); -  
208   -  
209 return; -  
210 } -  
211   -  
212 await Stop().ContinueWith(async _ => await Start()); -  
213 } -  
214   173  
215 private void MqttApplicationMessageReceivedHandler(MqttApplicationMessageReceivedEventArgs arg) 174 private async Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
216 { 175 {
217 // Do not bother with MQTT clients that do not set a client ID. 176 // Do not bother with MQTT clients that do not set a client ID.
218 if (string.IsNullOrEmpty(arg.ClientId)) 177 if (string.IsNullOrEmpty(arg.ClientId))
219 { 178 {
220 return; 179 return;
Line 221... Line 180...
221 } 180 }
Line 222... Line 181...
222   181  
Line 223... Line 182...
223 var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload); 182 var payload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array ?? Array.Empty<byte>());
224   183  
Line 241... Line 200...
241 switch (zzzTrigger) 200 switch (zzzTrigger)
242 { 201 {
243 case "ACTION": 202 case "ACTION":
244 try 203 try
245 { 204 {
246 var message = (ZzzAction) JsonConvert.DeserializeObject(payload, typeof(ZzzAction)); 205 var message = (ZzzAction)JsonConvert.DeserializeObject(payload, typeof(ZzzAction));
Line 247... Line 206...
247   206  
248 if (message == null) 207 if (message == null)
249 { 208 {
250 throw new ArgumentException("Invalid action received from MQTT broker."); 209 throw new ArgumentException("Invalid action received from MQTT broker.");
Line 259... Line 218...
259   218  
260 break; 219 break;
261 case "STATE": 220 case "STATE":
262 try 221 try
263 { 222 {
Line 264... Line 223...
264 var message = (ZzzState) JsonConvert.DeserializeObject(payload, typeof(ZzzState)); 223 var message = (ZzzState)JsonConvert.DeserializeObject(payload, typeof(ZzzState));
265   224  
266 if (message == null) 225 if (message == null)
267 { 226 {
Line 277... Line 236...
277   236  
278 break; 237 break;
279 } 238 }
Line 280... Line 239...
280 } 239 }
281   240  
282 private async Task MqttClientConnectedHandler(MqttClientConnectedEventArgs args) 241 private async Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
283 { 242 {
284 if (!_configuration.MqttEnable) 243 if (!_configuration.MqttEnable)
285 { 244 {
Line 320... Line 279...
320 { 279 {
321 MqttConnectionFailed?.Invoke(this, EventArgs.Empty); 280 MqttConnectionFailed?.Invoke(this, EventArgs.Empty);
322 } 281 }
323 } 282 }
Line 324... Line 283...
324   283  
325 private async Task MqttClientDisconnectedHandler(MqttClientDisconnectedEventArgs args) 284 private async Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
326 { 285 {
327 if (!_configuration.MqttEnable) 286 if (!_configuration.MqttEnable)
328 { 287 {
329 return; 288 return;
Line 349... Line 308...
349 { 308 {
350 Log.Warning(ex, "Disconnected from MQTT server."); 309 Log.Warning(ex, "Disconnected from MQTT server.");
351 } 310 }
352 } 311 }
Line -... Line 312...
-   312  
-   313 public async Task Stop()
-   314 {
-   315 if (_mqttCancellationTokenSource != null)
-   316 {
-   317 _mqttCancellationTokenSource.Cancel();
-   318  
-   319 if (_mqttClient != null && _mqttClient.IsConnected)
-   320 {
-   321 await _mqttClient.DisconnectAsync();
-   322  
-   323 MqttDisconnected?.Invoke(this, EventArgs.Empty);
-   324  
-   325 _mqttClient?.Dispose();
-   326 _mqttClient = null;
-   327 }
-   328  
-   329 _mqttCancellationTokenSource = null;
-   330 }
-   331 }
-   332  
-   333 #endregion
-   334  
-   335 #region Private Methods
-   336  
-   337 public async Task Restart()
-   338 {
-   339 if (!_configuration.MqttEnable)
-   340 {
-   341 await Stop();
-   342  
-   343 return;
-   344 }
-   345  
-   346 await Stop().ContinueWith(async _ => await Start());
-   347 }
353   348  
354 private static IMqttClientOptions BuildMqttClient(string server, int port, string username, string password) 349 private static MqttClientOptions BuildMqttClient(string server, int port, string username, string password)
355 { 350 {
356 return new MqttClientOptionsBuilder() 351 return new MqttClientOptionsBuilder()
357 .WithCommunicationTimeout(TimeSpan.FromMinutes(1)) 352 .WithTimeout(TimeSpan.FromMinutes(1))
358 .WithTcpServer(server, port) 353 .WithTcpServer(server, port)
359 .WithCredentials(username, password) 354 .WithCredentials(username, password)
360 .Build(); 355 .Build();