fst – Rev 1
?pathlinks?
#!/usr/bin/env node
// Package imports.
const os = require('os')
const readline = require('readline')
const SyncData = require('./SyncData.js')
const constants = require('./constants.js')
const symmetric = require('./symmetric.js')
const fs = require('fs')
const path = require('path')
const YAML = require('yamljs')
const { createLogger, format, transports } = require('winston')
const mqtt = require('mqtt')
const mosca = require('mosca')
const shell = require('shelljs')
const net = require('net')
// Local variables.
const mqttClients = []
const peers = []
// Load configuration file.
const config = YAML.load(
path.join(
path.dirname(
require.main.filename
),
'config.yml'
)
)
// Set up logger.
const logger = createLogger({
format: format.combine(
format.splat(),
format.simple()
),
transports: [
new transports.Console(),
new transports.File({ filename: config.log })
]
})
// Read peer list.
readline.createInterface({
input: fs.createReadStream(config.peers),
terminal: false
}).on('line', function (server) {
// Push the peers for later discovery usage.
peers.push(server)
// Add the the MQTT server to the pool and connect.
addMqttPeer(server)
})
// Create the socket server to listen for payloads.
var socketServer = net.createServer(function (stream) {
stream.on('data', onSocketSyncDataReceived)
}).on('listening', function () {
logger.info('Socket server listening')
}).on('error', function (error) {
if (error.code == 'EADDRINUSE') {
logger.warn('socket in use, cleaning up...')
setTimeout(function () {
fs.unlink(config.distribution.socket, function (error) {
if (error) {
logger.error('unable to remove socket')
return
}
logger.info('removed stale socket')
socketServer.listen(config.distribution.socket)
})
}, 1000)
}
})
// Listen on the configured socket.
socketServer.listen(config.distribution.socket)
// Create the MQTT server.
var mqttServer = new mosca.Server({
port: config.distribution.port
}).on('ready', function () {
logger.info('MQTT server listening')
}).on('published', function (packet, client) {
switch (packet.topic) {
case constants.SYNC_MESSAGE_TOPIC:
onMQTTSyncDataReceived(packet.topic, packet.payload)
break
case constants.SYNC_PEERS_TOPIC:
onMQTTPeerDiscoveryMessageReceived(packet.topic, packet.payload)
break
}
})
// Distribute peers periodically.
setInterval(distributePeers, config.distribution.peerSyncInterval)
function addMqttPeer(server) {
// Check whether the server has already been added.
if (mqttClients.some(function (client) { return client.options.properties.userProperties.server === server }))
return
// Establish connections to all MQTT servers defined in the peer list.
const client = mqtt.connect('mqtt://' + server, {
connectTimeout: config.distribution.timeout,
port: config.distribution.port,
reconnectPeriod: config.distribution.reconnect,
properties: {
userProperties: {
server: server
}
}
})
client.on('error', function (error) {
logger.log('error', 'connection to server %s failed with error %s', server, error)
}).on('connect', function (connack) {
logger.log('info', 'connected to server %s', server)
}).on('reconnect', function () {
logger.log('info', 'reconnecting to server %s', server)
})
mqttClients.push(client)
}
function distributePeers() {
// Serialize the payload.
const message = JSON.stringify(peers)
// Encrypt the payload.
var encrypted = ""
try {
encrypted = symmetric.encrypt(message, config.secret)
}
catch (error) {
logger.log('warn', 'encrypting payload %s failed with error %s', message, error)
return
}
// Publish message for each configured peer.
mqttClients.forEach(function (client, index) {
client.publish(constants.SYNC_PEERS_TOPIC,
encrypted,
{ qos: 0, retain: false, dup: false },
function (error) {
if (error) {
logger.log('error', 'could not publish message %s', error)
return
}
logger.log('info', 'published peers to %s',
client.options.properties.userProperties.server
)
})
})
}
// Handle peer discovery messages.
function onMQTTPeerDiscoveryMessageReceived(topic, message) {
// Decrypt the payload.
var decrypted = ""
try {
decrypted = symmetric.decrypt(message, config.secret)
}
catch (error) {
logger.log('warn', 'decrypting payload %s failed with error %s', message, error)
return
}
// Create a list of new peers.
var syncPeers = JSON.parse(decrypted).filter(function (peer) {
return peers.indexOf(peer) === -1
})
// No new peers have been discovered so no need to add them.
if (syncPeers.length === 0)
return
// Add tnew peers to list of peers.
syncPeers.forEach(function (peer, index) {
// Add new peer to the pool of MQTT peers.
addMqttPeer(peer)
// Store the peer.
peers.push(peer)
})
// Write the new list of peers.
fs.writeFile(config.peers, peers.join(os.EOL), function (error) {
if (error) {
logger.log('error', 'writing peer list file failed with error %s', error)
}
})
}
// Handle socket message.
function onSocketSyncDataReceived(data) {
// Serialize payload.
var syncMessage
try {
syncMessage = SyncData.fromJSON(data)
} catch (error) {
// Not a sync message so ignore it.
return
}
// Serialize the payload.
const message = JSON.stringify(syncMessage)
// Encrypt the payload.
var encrypted = ""
try {
encrypted = symmetric.encrypt(message, config.secret)
}
catch (error) {
logger.log('warn', 'encrypting payload %s failed with error %s', message, error)
return
}
// Publish message for each configured peer.
mqttClients.forEach(function (client, index) {
client.publish(constants.SYNC_MESSAGE_TOPIC,
encrypted,
{ qos: 0, retain: false, dup: false },
function (error) {
if (error) {
logger.log('error', 'could not publish message %s', error)
return
}
logger.log('info', 'published %s %s to %s',
syncMessage.action,
syncMessage.ip,
client.options.properties.userProperties.server
)
})
})
}
// Handle synchronization messages.
function onMQTTSyncDataReceived(topic, message) {
// Send to the metrics server if it has been enabled.
if (config.metrics.enable)
sendStatistics(message)
// Decrypt the payload.
var decrypted = ""
try {
decrypted = symmetric.decrypt(message, config.secret)
}
catch (error) {
logger.log('warn', 'decrypting payload %s failed with error %s', message, error)
return
}
// Deserialize the payload.
var syncMessage
try {
syncMessage = SyncData.fromJSON(decrypted)
} catch (error) {
// Not a sync message so ignore it.
return
}
logger.log('info', 'received payload %s from %s', JSON.stringify(syncMessage), syncMessage.peer)
// Build the path to the action directory.
const actionDirectory = path.join(config.action_directory, syncMessage.action + '.d')
// Check that the action directory exists.
fs.stat(actionDirectory, function (error, stats) {
if (error) {
logger.log('error', 'action directory %s for action %s does not exist', actionDirectory, syncMessage.action)
return
}
const executePayload = syncMessage.toArray()
// Execute each file in the action directory.
fs.readdir(actionDirectory, function (error, files) {
if (error) {
logger.log('error', 'could not enumerate files in action directory %s', actionDirectory)
return
}
files.forEach(function (file) {
// Create a copy of the payload and prepend the file to execute.
var command = executePayload
command.unshift(path.join(actionDirectory, file))
command = executePayload.join(' ')
logger.log('info', 'executing script with parameters %s', command)
// Execute the command.
shell.exec(command, { async: true, silent: true })
})
})
})
}
function sendStatistics(message) {
const stream = net.createConnection({
port: config.metrics.port,
host: config.metrics.host,
timeout: config.metrics.timeout
}, function () {
stream.write(message)
stream.end()
}).on('error', function (error) {
logger.warn('could not connect to the metrics server')
})
}