fst – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env node
2  
3 // Package imports.
4 const os = require('os')
5 const readline = require('readline')
6 const SyncData = require('./SyncData.js')
7 const constants = require('./constants.js')
8 const symmetric = require('./symmetric.js')
9 const fs = require('fs')
10 const path = require('path')
11 const YAML = require('yamljs')
12 const { createLogger, format, transports } = require('winston')
13 const mqtt = require('mqtt')
14 const mosca = require('mosca')
15 const shell = require('shelljs')
16 const net = require('net')
17  
18 // Local variables.
19 const mqttClients = []
20 const peers = []
21  
22 // Load configuration file.
23 const config = YAML.load(
24 path.join(
25 path.dirname(
26 require.main.filename
27 ),
28 'config.yml'
29 )
30 )
31  
32 // Set up logger.
33 const logger = createLogger({
34 format: format.combine(
35 format.splat(),
36 format.simple()
37 ),
38 transports: [
39 new transports.Console(),
40 new transports.File({ filename: config.log })
41 ]
42 })
43  
44 // Read peer list.
45 readline.createInterface({
46 input: fs.createReadStream(config.peers),
47 terminal: false
48 }).on('line', function (server) {
49 // Push the peers for later discovery usage.
50 peers.push(server)
51  
52 // Add the the MQTT server to the pool and connect.
53 addMqttPeer(server)
54 })
55  
56 // Create the socket server to listen for payloads.
57 var socketServer = net.createServer(function (stream) {
58 stream.on('data', onSocketSyncDataReceived)
59 }).on('listening', function () {
60 logger.info('Socket server listening')
61 }).on('error', function (error) {
62 if (error.code == 'EADDRINUSE') {
63 logger.warn('socket in use, cleaning up...')
64 setTimeout(function () {
65 fs.unlink(config.distribution.socket, function (error) {
66 if (error) {
67 logger.error('unable to remove socket')
68 return
69 }
70 logger.info('removed stale socket')
71 socketServer.listen(config.distribution.socket)
72 })
73 }, 1000)
74 }
75 })
76  
77 // Listen on the configured socket.
78 socketServer.listen(config.distribution.socket)
79  
80 // Create the MQTT server.
81 var mqttServer = new mosca.Server({
82 port: config.distribution.port
83 }).on('ready', function () {
84 logger.info('MQTT server listening')
85 }).on('published', function (packet, client) {
86 switch (packet.topic) {
87 case constants.SYNC_MESSAGE_TOPIC:
88 onMQTTSyncDataReceived(packet.topic, packet.payload)
89 break
90 case constants.SYNC_PEERS_TOPIC:
91 onMQTTPeerDiscoveryMessageReceived(packet.topic, packet.payload)
92 break
93 }
94 })
95  
96 // Distribute peers periodically.
97 setInterval(distributePeers, config.distribution.peerSyncInterval)
98  
99 function addMqttPeer(server) {
100 // Check whether the server has already been added.
101 if (mqttClients.some(function (client) { return client.options.properties.userProperties.server === server }))
102 return
103  
104 // Establish connections to all MQTT servers defined in the peer list.
105 const client = mqtt.connect('mqtt://' + server, {
106 connectTimeout: config.distribution.timeout,
107 port: config.distribution.port,
108 reconnectPeriod: config.distribution.reconnect,
109 properties: {
110 userProperties: {
111 server: server
112 }
113 }
114 })
115 client.on('error', function (error) {
116 logger.log('error', 'connection to server %s failed with error %s', server, error)
117 }).on('connect', function (connack) {
118 logger.log('info', 'connected to server %s', server)
119 }).on('reconnect', function () {
120 logger.log('info', 'reconnecting to server %s', server)
121 })
122  
123 mqttClients.push(client)
124 }
125  
126 function distributePeers() {
127 // Serialize the payload.
128 const message = JSON.stringify(peers)
129  
130 // Encrypt the payload.
131 var encrypted = ""
132 try {
133 encrypted = symmetric.encrypt(message, config.secret)
134 }
135 catch (error) {
136 logger.log('warn', 'encrypting payload %s failed with error %s', message, error)
137 return
138 }
139  
140 // Publish message for each configured peer.
141 mqttClients.forEach(function (client, index) {
142 client.publish(constants.SYNC_PEERS_TOPIC,
143 encrypted,
144 { qos: 0, retain: false, dup: false },
145 function (error) {
146 if (error) {
147 logger.log('error', 'could not publish message %s', error)
148 return
149 }
150 logger.log('info', 'published peers to %s',
151 client.options.properties.userProperties.server
152 )
153 })
154 })
155 }
156  
157 // Handle peer discovery messages.
158 function onMQTTPeerDiscoveryMessageReceived(topic, message) {
159 // Decrypt the payload.
160 var decrypted = ""
161 try {
162 decrypted = symmetric.decrypt(message, config.secret)
163 }
164 catch (error) {
165 logger.log('warn', 'decrypting payload %s failed with error %s', message, error)
166 return
167 }
168  
169 // Create a list of new peers.
170 var syncPeers = JSON.parse(decrypted).filter(function (peer) {
171 return peers.indexOf(peer) === -1
172 })
173  
174 // No new peers have been discovered so no need to add them.
175 if (syncPeers.length === 0)
176 return
177  
178 // Add tnew peers to list of peers.
179 syncPeers.forEach(function (peer, index) {
180 // Add new peer to the pool of MQTT peers.
181 addMqttPeer(peer)
182  
183 // Store the peer.
184 peers.push(peer)
185 })
186  
187 // Write the new list of peers.
188 fs.writeFile(config.peers, peers.join(os.EOL), function (error) {
189 if (error) {
190 logger.log('error', 'writing peer list file failed with error %s', error)
191 }
192 })
193 }
194  
195 // Handle socket message.
196 function onSocketSyncDataReceived(data) {
197 // Serialize payload.
198 var syncMessage
199 try {
200 syncMessage = SyncData.fromJSON(data)
201 } catch (error) {
202 // Not a sync message so ignore it.
203 return
204 }
205  
206 // Serialize the payload.
207 const message = JSON.stringify(syncMessage)
208  
209 // Encrypt the payload.
210 var encrypted = ""
211 try {
212 encrypted = symmetric.encrypt(message, config.secret)
213 }
214 catch (error) {
215 logger.log('warn', 'encrypting payload %s failed with error %s', message, error)
216 return
217 }
218  
219 // Publish message for each configured peer.
220 mqttClients.forEach(function (client, index) {
221 client.publish(constants.SYNC_MESSAGE_TOPIC,
222 encrypted,
223 { qos: 0, retain: false, dup: false },
224 function (error) {
225 if (error) {
226 logger.log('error', 'could not publish message %s', error)
227 return
228 }
229 logger.log('info', 'published %s %s to %s',
230 syncMessage.action,
231 syncMessage.ip,
232 client.options.properties.userProperties.server
233 )
234 })
235 })
236 }
237  
238 // Handle synchronization messages.
239 function onMQTTSyncDataReceived(topic, message) {
240 // Send to the metrics server if it has been enabled.
241 if (config.metrics.enable)
242 sendStatistics(message)
243  
244 // Decrypt the payload.
245 var decrypted = ""
246 try {
247 decrypted = symmetric.decrypt(message, config.secret)
248 }
249 catch (error) {
250 logger.log('warn', 'decrypting payload %s failed with error %s', message, error)
251 return
252 }
253  
254 // Deserialize the payload.
255 var syncMessage
256 try {
257 syncMessage = SyncData.fromJSON(decrypted)
258 } catch (error) {
259 // Not a sync message so ignore it.
260 return
261 }
262  
263 logger.log('info', 'received payload %s from %s', JSON.stringify(syncMessage), syncMessage.peer)
264  
265 // Build the path to the action directory.
266 const actionDirectory = path.join(config.action_directory, syncMessage.action + '.d')
267 // Check that the action directory exists.
268 fs.stat(actionDirectory, function (error, stats) {
269 if (error) {
270 logger.log('error', 'action directory %s for action %s does not exist', actionDirectory, syncMessage.action)
271 return
272 }
273  
274 const executePayload = syncMessage.toArray()
275  
276 // Execute each file in the action directory.
277 fs.readdir(actionDirectory, function (error, files) {
278 if (error) {
279 logger.log('error', 'could not enumerate files in action directory %s', actionDirectory)
280 return
281 }
282  
283 files.forEach(function (file) {
284 // Create a copy of the payload and prepend the file to execute.
285 var command = executePayload
286 command.unshift(path.join(actionDirectory, file))
287 command = executePayload.join(' ')
288  
289 logger.log('info', 'executing script with parameters %s', command)
290  
291 // Execute the command.
292 shell.exec(command, { async: true, silent: true })
293 })
294 })
295 })
296 }
297  
298 function sendStatistics(message) {
299 const stream = net.createConnection({
300 port: config.metrics.port,
301 host: config.metrics.host,
302 timeout: config.metrics.timeout
303 }, function () {
304 stream.write(message)
305 stream.end()
306 }).on('error', function (error) {
307 logger.warn('could not connect to the metrics server')
308 })
309 }