corrade-elasticsearch-heartbeat – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | #!/usr/bin/env nodejs |
2 | /////////////////////////////////////////////////////////////////////////// |
||
3 | // Copyright (C) 2019 Wizardry and Steamworks - License: CC BY 2.0 // |
||
4 | /////////////////////////////////////////////////////////////////////////// |
||
5 | |||
6 | const mqtt = require('mqtt') |
||
7 | const YAML = require('yamljs') |
||
8 | const { createLogger, format, transports } = require('winston') |
||
9 | const { Client } = require('@elastic/elasticsearch') |
||
10 | const qs = require('qs') |
||
11 | const path = require('path') |
||
12 | const fs = require('fs') |
||
13 | const was = require('was.js') |
||
14 | const moment = require('moment') |
||
15 | |||
16 | // Load configuration file. |
||
17 | const config = YAML.load('config.yml') |
||
18 | |||
19 | // Set up logger. |
||
20 | const logger = createLogger({ |
||
21 | format: format.combine( |
||
22 | format.splat(), |
||
23 | format.simple() |
||
24 | ), |
||
25 | transports: [ |
||
26 | new transports.Console({ |
||
27 | timestamp: true |
||
28 | }), |
||
29 | new transports.File( |
||
30 | { |
||
31 | timestamp: true, |
||
32 | filename: path.join(path.dirname(fs.realpathSync(__filename)), "log/corrade-elasticsearch-heartbeat.log") |
||
33 | } |
||
34 | ) |
||
35 | ] |
||
36 | }) |
||
37 | |||
38 | // initialize the elasticsearch client |
||
39 | const client = new Client( |
||
40 | { |
||
41 | node: config.elasticsearch.connect, |
||
42 | log: 'trace' |
||
43 | } |
||
44 | ) |
||
45 | |||
46 | // attempt to ping elasticsearch |
||
47 | client.ping({}, function (error, response) { |
||
48 | if (error) { |
||
49 | logger.error(`elasticsearch is down ${JSON.stringify(error)} ${JSON.stringify(response)}`) |
||
50 | return |
||
51 | } |
||
52 | |||
53 | logger.info('elasticsearch is up') |
||
54 | |||
55 | // create the index |
||
56 | client.indices.create({ |
||
57 | index: config.elasticsearch.index, |
||
58 | }, function (error, response, status) { |
||
59 | if (error && error.meta.body.error.type !== 'resource_already_exists_exception') { |
||
60 | logger.error(`could not create elasticsearch index ${error}`) |
||
61 | return |
||
62 | } |
||
63 | |||
64 | if (error && error.meta.body.error.type === 'resource_already_exists_exception') { |
||
65 | logger.info('elasticsearch index already exists') |
||
66 | } |
||
67 | |||
68 | |||
69 | // Subscribe to Corrade MQTT. |
||
70 | const mqttClient = mqtt.connect(config.corrade.mqtt) |
||
71 | |||
72 | mqttClient.on('reconnect', () => { |
||
73 | logger.info('Reconnecting to Corrade MQTT server...') |
||
74 | }) |
||
75 | |||
76 | mqttClient.on('connect', () => { |
||
77 | logger.info('Connected to Corrade MQTT server.') |
||
78 | // Subscribe to group message notifications with group name and password. |
||
79 | mqttClient.subscribe(`${config.corrade.group}/${config.corrade.password}/heartbeat`, (error) => { |
||
80 | if (error) { |
||
81 | logger.info('Error subscribing to Corrade MQTT group messages.') |
||
82 | return |
||
83 | } |
||
84 | |||
85 | logger.info('Subscribed to Corrade MQTT group messages.') |
||
86 | }) |
||
87 | }) |
||
88 | |||
89 | mqttClient.on('error', (error) => { |
||
90 | logger.error(`Error found while connecting to Corrade MQTT: ${error}`) |
||
91 | }) |
||
92 | |||
93 | mqttClient.on('message', (topic, message) => { |
||
94 | // Make an object out of the notification. |
||
95 | let notification = qs.parse(message.toString()) |
||
96 | |||
97 | // Check the notification parameters for sanity. |
||
98 | if (typeof notification.type === 'undefined' || |
||
99 | notification.type !== 'heartbeat') |
||
100 | return |
||
101 | |||
102 | var beat = {}; |
||
103 | let data = was.formats.csv.CSVToArray(notification.data) |
||
104 | data.forEach((value, index) => { |
||
105 | if (index % 2 !== 0) |
||
106 | return |
||
107 | |||
108 | beat[value] = data[index + 1] |
||
109 | |||
110 | }) |
||
111 | |||
112 | // Convert parameters to JavaScript types. |
||
113 | beat.StartTime = moment(beat.StartTime).toDate() |
||
114 | beat.AverageCPUUsage = parseInt(beat.AverageCPUUsage) |
||
115 | beat.AverageRAMUsage = parseInt(beat.AverageRAMUsage) |
||
116 | beat.AverageThreadsUsage = parseInt(beat.AverageThreadsUsage) |
||
117 | beat.ExecutingCommands = parseInt(beat.ExecutingCommands) |
||
118 | beat.ManifestingRLVBehaviours = parseInt(beat.ManifestingRLVBehaviours) |
||
119 | beat.ProcessedCommands = parseInt(beat.ProcessedCommands) |
||
120 | beat.ProcessedRLVBehaviours = parseInt(beat.ProcessedRLVBehaviours) |
||
121 | |||
122 | // Add extra parameters. |
||
123 | beat.AgentName = config.corrade.name |
||
124 | beat.TimeNow = moment().toDate() |
||
125 | |||
126 | client.index({ |
||
127 | index: config.elasticsearch.index, |
||
128 | body: beat |
||
129 | }, function (err, resp, status) { |
||
130 | if (err) { |
||
131 | logger.warn(`error whilst sending heartbeat data ${err}`) |
||
132 | } |
||
133 | }) |
||
134 | }) |
||
135 | }) |
||
136 | }) |
||
137 |