fst – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | #!/usr/bin/env node |
2 | |||
3 | // Package imports. |
||
4 | const os = require('os') |
||
5 | const readline = require('readline') |
||
6 | const SyncData = require('./SyncData.js') |
||
7 | const constants = require('./constants.js') |
||
8 | const symmetric = require('./symmetric.js') |
||
9 | const fs = require('fs') |
||
10 | const path = require('path') |
||
11 | const YAML = require('yamljs') |
||
12 | const { createLogger, format, transports } = require('winston') |
||
13 | const mqtt = require('mqtt') |
||
14 | const mosca = require('mosca') |
||
15 | const shell = require('shelljs') |
||
16 | const net = require('net') |
||
17 | |||
18 | // Local variables. |
||
19 | const mqttClients = [] |
||
20 | const peers = [] |
||
21 | |||
22 | // Load configuration file. |
||
23 | const config = YAML.load( |
||
24 | path.join( |
||
25 | path.dirname( |
||
26 | require.main.filename |
||
27 | ), |
||
28 | 'config.yml' |
||
29 | ) |
||
30 | ) |
||
31 | |||
32 | // Set up logger. |
||
33 | const logger = createLogger({ |
||
34 | format: format.combine( |
||
35 | format.splat(), |
||
36 | format.simple() |
||
37 | ), |
||
38 | transports: [ |
||
39 | new transports.Console(), |
||
40 | new transports.File({ filename: config.log }) |
||
41 | ] |
||
42 | }) |
||
43 | |||
44 | // Read peer list. |
||
45 | readline.createInterface({ |
||
46 | input: fs.createReadStream(config.peers), |
||
47 | terminal: false |
||
48 | }).on('line', function (server) { |
||
49 | // Push the peers for later discovery usage. |
||
50 | peers.push(server) |
||
51 | |||
52 | // Add the the MQTT server to the pool and connect. |
||
53 | addMqttPeer(server) |
||
54 | }) |
||
55 | |||
56 | // Create the socket server to listen for payloads. |
||
57 | var socketServer = net.createServer(function (stream) { |
||
58 | stream.on('data', onSocketSyncDataReceived) |
||
59 | }).on('listening', function () { |
||
60 | logger.info('Socket server listening') |
||
61 | }).on('error', function (error) { |
||
62 | if (error.code == 'EADDRINUSE') { |
||
63 | logger.warn('socket in use, cleaning up...') |
||
64 | setTimeout(function () { |
||
65 | fs.unlink(config.distribution.socket, function (error) { |
||
66 | if (error) { |
||
67 | logger.error('unable to remove socket') |
||
68 | return |
||
69 | } |
||
70 | logger.info('removed stale socket') |
||
71 | socketServer.listen(config.distribution.socket) |
||
72 | }) |
||
73 | }, 1000) |
||
74 | } |
||
75 | }) |
||
76 | |||
77 | // Listen on the configured socket. |
||
78 | socketServer.listen(config.distribution.socket) |
||
79 | |||
80 | // Create the MQTT server. |
||
81 | var mqttServer = new mosca.Server({ |
||
82 | port: config.distribution.port |
||
83 | }).on('ready', function () { |
||
84 | logger.info('MQTT server listening') |
||
85 | }).on('published', function (packet, client) { |
||
86 | switch (packet.topic) { |
||
87 | case constants.SYNC_MESSAGE_TOPIC: |
||
88 | onMQTTSyncDataReceived(packet.topic, packet.payload) |
||
89 | break |
||
90 | case constants.SYNC_PEERS_TOPIC: |
||
91 | onMQTTPeerDiscoveryMessageReceived(packet.topic, packet.payload) |
||
92 | break |
||
93 | } |
||
94 | }) |
||
95 | |||
96 | // Distribute peers periodically. |
||
97 | setInterval(distributePeers, config.distribution.peerSyncInterval) |
||
98 | |||
99 | function addMqttPeer(server) { |
||
100 | // Check whether the server has already been added. |
||
101 | if (mqttClients.some(function (client) { return client.options.properties.userProperties.server === server })) |
||
102 | return |
||
103 | |||
104 | // Establish connections to all MQTT servers defined in the peer list. |
||
105 | const client = mqtt.connect('mqtt://' + server, { |
||
106 | connectTimeout: config.distribution.timeout, |
||
107 | port: config.distribution.port, |
||
108 | reconnectPeriod: config.distribution.reconnect, |
||
109 | properties: { |
||
110 | userProperties: { |
||
111 | server: server |
||
112 | } |
||
113 | } |
||
114 | }) |
||
115 | client.on('error', function (error) { |
||
116 | logger.log('error', 'connection to server %s failed with error %s', server, error) |
||
117 | }).on('connect', function (connack) { |
||
118 | logger.log('info', 'connected to server %s', server) |
||
119 | }).on('reconnect', function () { |
||
120 | logger.log('info', 'reconnecting to server %s', server) |
||
121 | }) |
||
122 | |||
123 | mqttClients.push(client) |
||
124 | } |
||
125 | |||
126 | function distributePeers() { |
||
127 | // Serialize the payload. |
||
128 | const message = JSON.stringify(peers) |
||
129 | |||
130 | // Encrypt the payload. |
||
131 | var encrypted = "" |
||
132 | try { |
||
133 | encrypted = symmetric.encrypt(message, config.secret) |
||
134 | } |
||
135 | catch (error) { |
||
136 | logger.log('warn', 'encrypting payload %s failed with error %s', message, error) |
||
137 | return |
||
138 | } |
||
139 | |||
140 | // Publish message for each configured peer. |
||
141 | mqttClients.forEach(function (client, index) { |
||
142 | client.publish(constants.SYNC_PEERS_TOPIC, |
||
143 | encrypted, |
||
144 | { qos: 0, retain: false, dup: false }, |
||
145 | function (error) { |
||
146 | if (error) { |
||
147 | logger.log('error', 'could not publish message %s', error) |
||
148 | return |
||
149 | } |
||
150 | logger.log('info', 'published peers to %s', |
||
151 | client.options.properties.userProperties.server |
||
152 | ) |
||
153 | }) |
||
154 | }) |
||
155 | } |
||
156 | |||
157 | // Handle peer discovery messages. |
||
158 | function onMQTTPeerDiscoveryMessageReceived(topic, message) { |
||
159 | // Decrypt the payload. |
||
160 | var decrypted = "" |
||
161 | try { |
||
162 | decrypted = symmetric.decrypt(message, config.secret) |
||
163 | } |
||
164 | catch (error) { |
||
165 | logger.log('warn', 'decrypting payload %s failed with error %s', message, error) |
||
166 | return |
||
167 | } |
||
168 | |||
169 | // Create a list of new peers. |
||
170 | var syncPeers = JSON.parse(decrypted).filter(function (peer) { |
||
171 | return peers.indexOf(peer) === -1 |
||
172 | }) |
||
173 | |||
174 | // No new peers have been discovered so no need to add them. |
||
175 | if (syncPeers.length === 0) |
||
176 | return |
||
177 | |||
178 | // Add tnew peers to list of peers. |
||
179 | syncPeers.forEach(function (peer, index) { |
||
180 | // Add new peer to the pool of MQTT peers. |
||
181 | addMqttPeer(peer) |
||
182 | |||
183 | // Store the peer. |
||
184 | peers.push(peer) |
||
185 | }) |
||
186 | |||
187 | // Write the new list of peers. |
||
188 | fs.writeFile(config.peers, peers.join(os.EOL), function (error) { |
||
189 | if (error) { |
||
190 | logger.log('error', 'writing peer list file failed with error %s', error) |
||
191 | } |
||
192 | }) |
||
193 | } |
||
194 | |||
195 | // Handle socket message. |
||
196 | function onSocketSyncDataReceived(data) { |
||
197 | // Serialize payload. |
||
198 | var syncMessage |
||
199 | try { |
||
200 | syncMessage = SyncData.fromJSON(data) |
||
201 | } catch (error) { |
||
202 | // Not a sync message so ignore it. |
||
203 | return |
||
204 | } |
||
205 | |||
206 | // Serialize the payload. |
||
207 | const message = JSON.stringify(syncMessage) |
||
208 | |||
209 | // Encrypt the payload. |
||
210 | var encrypted = "" |
||
211 | try { |
||
212 | encrypted = symmetric.encrypt(message, config.secret) |
||
213 | } |
||
214 | catch (error) { |
||
215 | logger.log('warn', 'encrypting payload %s failed with error %s', message, error) |
||
216 | return |
||
217 | } |
||
218 | |||
219 | // Publish message for each configured peer. |
||
220 | mqttClients.forEach(function (client, index) { |
||
221 | client.publish(constants.SYNC_MESSAGE_TOPIC, |
||
222 | encrypted, |
||
223 | { qos: 0, retain: false, dup: false }, |
||
224 | function (error) { |
||
225 | if (error) { |
||
226 | logger.log('error', 'could not publish message %s', error) |
||
227 | return |
||
228 | } |
||
229 | logger.log('info', 'published %s %s to %s', |
||
230 | syncMessage.action, |
||
231 | syncMessage.ip, |
||
232 | client.options.properties.userProperties.server |
||
233 | ) |
||
234 | }) |
||
235 | }) |
||
236 | } |
||
237 | |||
238 | // Handle synchronization messages. |
||
239 | function onMQTTSyncDataReceived(topic, message) { |
||
240 | // Send to the metrics server if it has been enabled. |
||
241 | if (config.metrics.enable) |
||
242 | sendStatistics(message) |
||
243 | |||
244 | // Decrypt the payload. |
||
245 | var decrypted = "" |
||
246 | try { |
||
247 | decrypted = symmetric.decrypt(message, config.secret) |
||
248 | } |
||
249 | catch (error) { |
||
250 | logger.log('warn', 'decrypting payload %s failed with error %s', message, error) |
||
251 | return |
||
252 | } |
||
253 | |||
254 | // Deserialize the payload. |
||
255 | var syncMessage |
||
256 | try { |
||
257 | syncMessage = SyncData.fromJSON(decrypted) |
||
258 | } catch (error) { |
||
259 | // Not a sync message so ignore it. |
||
260 | return |
||
261 | } |
||
262 | |||
263 | logger.log('info', 'received payload %s from %s', JSON.stringify(syncMessage), syncMessage.peer) |
||
264 | |||
265 | // Build the path to the action directory. |
||
266 | const actionDirectory = path.join(config.action_directory, syncMessage.action + '.d') |
||
267 | // Check that the action directory exists. |
||
268 | fs.stat(actionDirectory, function (error, stats) { |
||
269 | if (error) { |
||
270 | logger.log('error', 'action directory %s for action %s does not exist', actionDirectory, syncMessage.action) |
||
271 | return |
||
272 | } |
||
273 | |||
274 | const executePayload = syncMessage.toArray() |
||
275 | |||
276 | // Execute each file in the action directory. |
||
277 | fs.readdir(actionDirectory, function (error, files) { |
||
278 | if (error) { |
||
279 | logger.log('error', 'could not enumerate files in action directory %s', actionDirectory) |
||
280 | return |
||
281 | } |
||
282 | |||
283 | files.forEach(function (file) { |
||
284 | // Create a copy of the payload and prepend the file to execute. |
||
285 | var command = executePayload |
||
286 | command.unshift(path.join(actionDirectory, file)) |
||
287 | command = executePayload.join(' ') |
||
288 | |||
289 | logger.log('info', 'executing script with parameters %s', command) |
||
290 | |||
291 | // Execute the command. |
||
292 | shell.exec(command, { async: true, silent: true }) |
||
293 | }) |
||
294 | }) |
||
295 | }) |
||
296 | } |
||
297 | |||
298 | function sendStatistics(message) { |
||
299 | const stream = net.createConnection({ |
||
300 | port: config.metrics.port, |
||
301 | host: config.metrics.host, |
||
302 | timeout: config.metrics.timeout |
||
303 | }, function () { |
||
304 | stream.write(message) |
||
305 | stream.end() |
||
306 | }).on('error', function (error) { |
||
307 | logger.warn('could not connect to the metrics server') |
||
308 | }) |
||
309 | } |