corrade-elasticsearch-heartbeat – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env nodejs
2 ///////////////////////////////////////////////////////////////////////////
3 // Copyright (C) 2019 Wizardry and Steamworks - License: CC BY 2.0 //
4 ///////////////////////////////////////////////////////////////////////////
5  
6 const mqtt = require('mqtt')
7 const YAML = require('yamljs')
8 const { createLogger, format, transports } = require('winston')
9 const { Client } = require('@elastic/elasticsearch')
10 const qs = require('qs')
11 const path = require('path')
12 const fs = require('fs')
13 const was = require('was.js')
14 const moment = require('moment')
15  
16 // Load configuration file.
17 const config = YAML.load('config.yml')
18  
19 // Set up logger.
20 const logger = createLogger({
21 format: format.combine(
22 format.splat(),
23 format.simple()
24 ),
25 transports: [
26 new transports.Console({
27 timestamp: true
28 }),
29 new transports.File(
30 {
31 timestamp: true,
32 filename: path.join(path.dirname(fs.realpathSync(__filename)), "log/corrade-elasticsearch-heartbeat.log")
33 }
34 )
35 ]
36 })
37  
38 // initialize the elasticsearch client
39 const client = new Client(
40 {
41 node: config.elasticsearch.connect,
42 log: 'trace'
43 }
44 )
45  
46 // attempt to ping elasticsearch
47 client.ping({}, function (error, response) {
48 if (error) {
49 logger.error(`elasticsearch is down ${JSON.stringify(error)} ${JSON.stringify(response)}`)
50 return
51 }
52  
53 logger.info('elasticsearch is up')
54  
55 // create the index
56 client.indices.create({
57 index: config.elasticsearch.index,
58 }, function (error, response, status) {
59 if (error && error.meta.body.error.type !== 'resource_already_exists_exception') {
60 logger.error(`could not create elasticsearch index ${error}`)
61 return
62 }
63  
64 if (error && error.meta.body.error.type === 'resource_already_exists_exception') {
65 logger.info('elasticsearch index already exists')
66 }
67  
68  
69 // Subscribe to Corrade MQTT.
70 const mqttClient = mqtt.connect(config.corrade.mqtt)
71  
72 mqttClient.on('reconnect', () => {
73 logger.info('Reconnecting to Corrade MQTT server...')
74 })
75  
76 mqttClient.on('connect', () => {
77 logger.info('Connected to Corrade MQTT server.')
78 // Subscribe to group message notifications with group name and password.
79 mqttClient.subscribe(`${config.corrade.group}/${config.corrade.password}/heartbeat`, (error) => {
80 if (error) {
81 logger.info('Error subscribing to Corrade MQTT group messages.')
82 return
83 }
84  
85 logger.info('Subscribed to Corrade MQTT group messages.')
86 })
87 })
88  
89 mqttClient.on('error', (error) => {
90 logger.error(`Error found while connecting to Corrade MQTT: ${error}`)
91 })
92  
93 mqttClient.on('message', (topic, message) => {
94 // Make an object out of the notification.
95 let notification = qs.parse(message.toString())
96  
97 // Check the notification parameters for sanity.
98 if (typeof notification.type === 'undefined' ||
99 notification.type !== 'heartbeat')
100 return
101  
102 var beat = {};
103 let data = was.formats.csv.CSVToArray(notification.data)
104 data.forEach((value, index) => {
105 if (index % 2 !== 0)
106 return
107  
108 beat[value] = data[index + 1]
109  
110 })
111  
112 // Convert parameters to JavaScript types.
113 beat.StartTime = moment(beat.StartTime).toDate()
114 beat.AverageCPUUsage = parseInt(beat.AverageCPUUsage)
115 beat.AverageRAMUsage = parseInt(beat.AverageRAMUsage)
116 beat.AverageThreadsUsage = parseInt(beat.AverageThreadsUsage)
117 beat.ExecutingCommands = parseInt(beat.ExecutingCommands)
118 beat.ManifestingRLVBehaviours = parseInt(beat.ManifestingRLVBehaviours)
119 beat.ProcessedCommands = parseInt(beat.ProcessedCommands)
120 beat.ProcessedRLVBehaviours = parseInt(beat.ProcessedRLVBehaviours)
121  
122 // Add extra parameters.
123 beat.AgentName = config.corrade.name
124 beat.TimeNow = moment().toDate()
125  
126 client.index({
127 index: config.elasticsearch.index,
128 body: beat
129 }, function (err, resp, status) {
130 if (err) {
131 logger.warn(`error whilst sending heartbeat data ${err}`)
132 }
133 })
134 })
135 })
136 })
137