corrade-elasticsearch-heartbeat – Rev 1

Subversion Repositories:
Rev:
#!/usr/bin/env nodejs
///////////////////////////////////////////////////////////////////////////
//    Copyright (C) 2019 Wizardry and Steamworks - License: CC BY 2.0    //
///////////////////////////////////////////////////////////////////////////

const mqtt = require('mqtt')
const YAML = require('yamljs')
const { createLogger, format, transports } = require('winston')
const { Client } = require('@elastic/elasticsearch')
const qs = require('qs')
const path = require('path')
const fs = require('fs')
const was = require('was.js')
const moment = require('moment')

// Load configuration file.
const config = YAML.load('config.yml')

// Set up logger.
const logger = createLogger({
    format: format.combine(
        format.splat(),
        format.simple()
    ),
    transports: [
        new transports.Console({
            timestamp: true
        }),
        new transports.File(
            {
                timestamp: true,
                filename: path.join(path.dirname(fs.realpathSync(__filename)), "log/corrade-elasticsearch-heartbeat.log")
            }
        )
    ]
})

// initialize the elasticsearch client
const client = new Client(
    {
        node: config.elasticsearch.connect,
        log: 'trace'
    }
)

// attempt to ping elasticsearch
client.ping({}, function (error, response) {
    if (error) {
        logger.error(`elasticsearch is down ${JSON.stringify(error)} ${JSON.stringify(response)}`)
        return
    }

    logger.info('elasticsearch is up')

    // create the index
    client.indices.create({
        index: config.elasticsearch.index,
    }, function (error, response, status) {
        if (error && error.meta.body.error.type !== 'resource_already_exists_exception') {
            logger.error(`could not create elasticsearch index ${error}`)
            return
        }

        if (error && error.meta.body.error.type === 'resource_already_exists_exception') {
            logger.info('elasticsearch index already exists')
        }


        // Subscribe to Corrade MQTT.
        const mqttClient = mqtt.connect(config.corrade.mqtt)

        mqttClient.on('reconnect', () => {
            logger.info('Reconnecting to Corrade MQTT server...')
        })

        mqttClient.on('connect', () => {
            logger.info('Connected to Corrade MQTT server.')
            // Subscribe to group message notifications with group name and password.
            mqttClient.subscribe(`${config.corrade.group}/${config.corrade.password}/heartbeat`, (error) => {
                if (error) {
                    logger.info('Error subscribing to Corrade MQTT group messages.')
                    return
                }

                logger.info('Subscribed to Corrade MQTT group messages.')
            })
        })

        mqttClient.on('error', (error) => {
            logger.error(`Error found while connecting to Corrade MQTT: ${error}`)
        })

        mqttClient.on('message', (topic, message) => {
            // Make an object out of the notification.
            let notification = qs.parse(message.toString())

            // Check the notification parameters for sanity.
            if (typeof notification.type === 'undefined' ||
                notification.type !== 'heartbeat')
                return

            var beat = {};
            let data = was.formats.csv.CSVToArray(notification.data)
            data.forEach((value, index) => {
                if (index % 2 !== 0)
                    return

                beat[value] = data[index + 1]

            })

            // Convert parameters to JavaScript types.
            beat.StartTime = moment(beat.StartTime).toDate()
            beat.AverageCPUUsage = parseInt(beat.AverageCPUUsage)
            beat.AverageRAMUsage = parseInt(beat.AverageRAMUsage)
            beat.AverageThreadsUsage = parseInt(beat.AverageThreadsUsage)
            beat.ExecutingCommands = parseInt(beat.ExecutingCommands)
            beat.ManifestingRLVBehaviours = parseInt(beat.ManifestingRLVBehaviours)
            beat.ProcessedCommands = parseInt(beat.ProcessedCommands)
            beat.ProcessedRLVBehaviours = parseInt(beat.ProcessedRLVBehaviours)

            // Add extra parameters.
            beat.AgentName = config.corrade.name
            beat.TimeNow = moment().toDate()

            client.index({
                index: config.elasticsearch.index,
                body: beat
            }, function (err, resp, status) {
                if (err) {
                    logger.warn(`error whilst sending heartbeat data ${err}`)
                }
            })
        })
    })
})