corrade-elasticsearch-heartbeat – Rev 1
?pathlinks?
#!/usr/bin/env nodejs
///////////////////////////////////////////////////////////////////////////
// Copyright (C) 2019 Wizardry and Steamworks - License: CC BY 2.0 //
///////////////////////////////////////////////////////////////////////////
const mqtt = require('mqtt')
const YAML = require('yamljs')
const { createLogger, format, transports } = require('winston')
const { Client } = require('@elastic/elasticsearch')
const qs = require('qs')
const path = require('path')
const fs = require('fs')
const was = require('was.js')
const moment = require('moment')
// Load configuration file.
const config = YAML.load('config.yml')
// Set up logger.
const logger = createLogger({
format: format.combine(
format.splat(),
format.simple()
),
transports: [
new transports.Console({
timestamp: true
}),
new transports.File(
{
timestamp: true,
filename: path.join(path.dirname(fs.realpathSync(__filename)), "log/corrade-elasticsearch-heartbeat.log")
}
)
]
})
// initialize the elasticsearch client
const client = new Client(
{
node: config.elasticsearch.connect,
log: 'trace'
}
)
// attempt to ping elasticsearch
client.ping({}, function (error, response) {
if (error) {
logger.error(`elasticsearch is down ${JSON.stringify(error)} ${JSON.stringify(response)}`)
return
}
logger.info('elasticsearch is up')
// create the index
client.indices.create({
index: config.elasticsearch.index,
}, function (error, response, status) {
if (error && error.meta.body.error.type !== 'resource_already_exists_exception') {
logger.error(`could not create elasticsearch index ${error}`)
return
}
if (error && error.meta.body.error.type === 'resource_already_exists_exception') {
logger.info('elasticsearch index already exists')
}
// Subscribe to Corrade MQTT.
const mqttClient = mqtt.connect(config.corrade.mqtt)
mqttClient.on('reconnect', () => {
logger.info('Reconnecting to Corrade MQTT server...')
})
mqttClient.on('connect', () => {
logger.info('Connected to Corrade MQTT server.')
// Subscribe to group message notifications with group name and password.
mqttClient.subscribe(`${config.corrade.group}/${config.corrade.password}/heartbeat`, (error) => {
if (error) {
logger.info('Error subscribing to Corrade MQTT group messages.')
return
}
logger.info('Subscribed to Corrade MQTT group messages.')
})
})
mqttClient.on('error', (error) => {
logger.error(`Error found while connecting to Corrade MQTT: ${error}`)
})
mqttClient.on('message', (topic, message) => {
// Make an object out of the notification.
let notification = qs.parse(message.toString())
// Check the notification parameters for sanity.
if (typeof notification.type === 'undefined' ||
notification.type !== 'heartbeat')
return
var beat = {};
let data = was.formats.csv.CSVToArray(notification.data)
data.forEach((value, index) => {
if (index % 2 !== 0)
return
beat[value] = data[index + 1]
})
// Convert parameters to JavaScript types.
beat.StartTime = moment(beat.StartTime).toDate()
beat.AverageCPUUsage = parseInt(beat.AverageCPUUsage)
beat.AverageRAMUsage = parseInt(beat.AverageRAMUsage)
beat.AverageThreadsUsage = parseInt(beat.AverageThreadsUsage)
beat.ExecutingCommands = parseInt(beat.ExecutingCommands)
beat.ManifestingRLVBehaviours = parseInt(beat.ManifestingRLVBehaviours)
beat.ProcessedCommands = parseInt(beat.ProcessedCommands)
beat.ProcessedRLVBehaviours = parseInt(beat.ProcessedRLVBehaviours)
// Add extra parameters.
beat.AgentName = config.corrade.name
beat.TimeNow = moment().toDate()
client.index({
index: config.elasticsearch.index,
body: beat
}, function (err, resp, status) {
if (err) {
logger.warn(`error whilst sending heartbeat data ${err}`)
}
})
})
})
})