fst – Rev 1

Subversion Repositories:
Rev:
#!/usr/bin/env node

// Package imports.
const os = require('os')
const readline = require('readline')
const SyncData = require('./SyncData.js')
const constants = require('./constants.js')
const symmetric = require('./symmetric.js')
const fs = require('fs')
const path = require('path')
const YAML = require('yamljs')
const { createLogger, format, transports } = require('winston')
const mqtt = require('mqtt')
const mosca = require('mosca')
const shell = require('shelljs')
const net = require('net')

// Local variables.
const mqttClients = []
const peers = []

// Load configuration file.
const config = YAML.load(
  path.join(
    path.dirname(
      require.main.filename
    ),
    'config.yml'
  )
)

// Set up logger.
const logger = createLogger({
  format: format.combine(
    format.splat(),
    format.simple()
  ),
  transports: [
    new transports.Console(),
    new transports.File({ filename: config.log })
  ]
})

// Read peer list.
readline.createInterface({
  input: fs.createReadStream(config.peers),
  terminal: false
}).on('line', function (server) {
  // Push the peers for later discovery usage.
  peers.push(server)

  // Add the the MQTT server to the pool and connect.
  addMqttPeer(server)
})

// Create the socket server to listen for payloads.
var socketServer = net.createServer(function (stream) {
  stream.on('data', onSocketSyncDataReceived)
}).on('listening', function () {
  logger.info('Socket server listening')
}).on('error', function (error) {
  if (error.code == 'EADDRINUSE') {
    logger.warn('socket in use, cleaning up...')
    setTimeout(function () {
      fs.unlink(config.distribution.socket, function (error) {
        if (error) {
          logger.error('unable to remove socket')
          return
        }
        logger.info('removed stale socket')
        socketServer.listen(config.distribution.socket)
      })
    }, 1000)
  }
})

// Listen on the configured socket.
socketServer.listen(config.distribution.socket)

// Create the MQTT server.
var mqttServer = new mosca.Server({
  port: config.distribution.port
}).on('ready', function () {
  logger.info('MQTT server listening')
}).on('published', function (packet, client) {
  switch (packet.topic) {
    case constants.SYNC_MESSAGE_TOPIC:
      onMQTTSyncDataReceived(packet.topic, packet.payload)
      break
    case constants.SYNC_PEERS_TOPIC:
      onMQTTPeerDiscoveryMessageReceived(packet.topic, packet.payload)
      break
  }
})

// Distribute peers periodically.
setInterval(distributePeers, config.distribution.peerSyncInterval)

function addMqttPeer(server) {
  // Check whether the server has already been added.
  if (mqttClients.some(function (client) { return client.options.properties.userProperties.server === server }))
    return

  // Establish connections to all MQTT servers defined in the peer list.
  const client = mqtt.connect('mqtt://' + server, {
    connectTimeout: config.distribution.timeout,
    port: config.distribution.port,
    reconnectPeriod: config.distribution.reconnect,
    properties: {
      userProperties: {
        server: server
      }
    }
  })
  client.on('error', function (error) {
    logger.log('error', 'connection to server %s failed with error %s', server, error)
  }).on('connect', function (connack) {
    logger.log('info', 'connected to server %s', server)
  }).on('reconnect', function () {
    logger.log('info', 'reconnecting to server %s', server)
  })

  mqttClients.push(client)
}

function distributePeers() {
  // Serialize the payload.
  const message = JSON.stringify(peers)

  // Encrypt the payload.
  var encrypted = ""
  try {
    encrypted = symmetric.encrypt(message, config.secret)
  }
  catch (error) {
    logger.log('warn', 'encrypting payload %s failed with error %s', message, error)
    return
  }

  // Publish message for each configured peer.
  mqttClients.forEach(function (client, index) {
    client.publish(constants.SYNC_PEERS_TOPIC,
      encrypted,
      { qos: 0, retain: false, dup: false },
      function (error) {
        if (error) {
          logger.log('error', 'could not publish message %s', error)
          return
        }
        logger.log('info', 'published peers to %s',
          client.options.properties.userProperties.server
        )
      })
  })
}

// Handle peer discovery messages.
function onMQTTPeerDiscoveryMessageReceived(topic, message) {
  // Decrypt the payload.
  var decrypted = ""
  try {
    decrypted = symmetric.decrypt(message, config.secret)
  }
  catch (error) {
    logger.log('warn', 'decrypting payload %s failed with error %s', message, error)
    return
  }

  // Create a list of new peers.
  var syncPeers = JSON.parse(decrypted).filter(function (peer) {
    return peers.indexOf(peer) === -1
  })

  // No new peers have been discovered so no need to add them.
  if (syncPeers.length === 0)
    return

  // Add tnew peers to list of peers.
  syncPeers.forEach(function (peer, index) {
    // Add new peer to the pool of MQTT peers.
    addMqttPeer(peer)

    // Store the peer.
    peers.push(peer)
  })

  // Write the new list of peers.
  fs.writeFile(config.peers, peers.join(os.EOL), function (error) {
    if (error) {
      logger.log('error', 'writing peer list file failed with error %s', error)
    }
  })
}

// Handle socket message.
function onSocketSyncDataReceived(data) {
  // Serialize payload.
  var syncMessage
  try {
    syncMessage = SyncData.fromJSON(data)
  } catch (error) {
    // Not a sync message so ignore it.
    return
  }

  // Serialize the payload.
  const message = JSON.stringify(syncMessage)

  // Encrypt the payload.
  var encrypted = ""
  try {
    encrypted = symmetric.encrypt(message, config.secret)
  }
  catch (error) {
    logger.log('warn', 'encrypting payload %s failed with error %s', message, error)
    return
  }

  // Publish message for each configured peer.
  mqttClients.forEach(function (client, index) {
    client.publish(constants.SYNC_MESSAGE_TOPIC,
      encrypted,
      { qos: 0, retain: false, dup: false },
      function (error) {
        if (error) {
          logger.log('error', 'could not publish message %s', error)
          return
        }
        logger.log('info', 'published %s %s to %s',
          syncMessage.action,
          syncMessage.ip,
          client.options.properties.userProperties.server
        )
      })
  })
}

// Handle synchronization messages.
function onMQTTSyncDataReceived(topic, message) {
  // Send to the metrics server if it has been enabled.
  if (config.metrics.enable)
    sendStatistics(message)

  // Decrypt the payload.
  var decrypted = ""
  try {
    decrypted = symmetric.decrypt(message, config.secret)
  }
  catch (error) {
    logger.log('warn', 'decrypting payload %s failed with error %s', message, error)
    return
  }

  // Deserialize the payload.
  var syncMessage
  try {
    syncMessage = SyncData.fromJSON(decrypted)
  } catch (error) {
    // Not a sync message so ignore it.
    return
  }

  logger.log('info', 'received payload %s from %s', JSON.stringify(syncMessage), syncMessage.peer)

  // Build the path to the action directory.
  const actionDirectory = path.join(config.action_directory, syncMessage.action + '.d')
  // Check that the action directory exists.
  fs.stat(actionDirectory, function (error, stats) {
    if (error) {
      logger.log('error', 'action directory %s for action %s does not exist', actionDirectory, syncMessage.action)
      return
    }

    const executePayload = syncMessage.toArray()

    // Execute each file in the action directory.
    fs.readdir(actionDirectory, function (error, files) {
      if (error) {
        logger.log('error', 'could not enumerate files in action directory %s', actionDirectory)
        return
      }

      files.forEach(function (file) {
        // Create a copy of the payload and prepend the file to execute.
        var command = executePayload
        command.unshift(path.join(actionDirectory, file))
        command = executePayload.join(' ')

        logger.log('info', 'executing script with parameters %s', command)

        // Execute the command.
        shell.exec(command, { async: true, silent: true })
      })
    })
  })
}

function sendStatistics(message) {
  const stream = net.createConnection({
    port: config.metrics.port,
    host: config.metrics.host,
    timeout: config.metrics.timeout
  }, function () {
    stream.write(message)
    stream.end()
  }).on('error', function (error) {
    logger.warn('could not connect to the metrics server')
  })
}