1
0
mirror of https://github.com/netdata/netdata.git synced 2021-06-06 23:03:21 +03:00

Implement new incremental parser (#9074)

Implemented a new parser for the pluginsd language
This commit is contained in:
Stelios Fragkakis
2020-05-25 20:25:08 +03:00
committed by GitHub
parent 4fb41597da
commit 570d5253f5
12 changed files with 1315 additions and 498 deletions

View File

@@ -105,6 +105,7 @@ SUBDIRS += \
streaming \
web \
claim \
parser \
aclk \
spawn \
$(NULL)
@@ -325,6 +326,8 @@ MACOS_PLUGIN_FILES = \
PLUGINSD_PLUGIN_FILES = \
collectors/plugins.d/plugins_d.c \
collectors/plugins.d/plugins_d.h \
collectors/plugins.d/pluginsd_parser.c \
collectors/plugins.d/pluginsd_parser.h \
$(NULL)
RRD_PLUGIN_FILES = \
@@ -472,6 +475,11 @@ CLAIM_FILES = \
claim/claim.h \
$(NULL)
PARSER_FILES = \
parser/parser.c \
parser/parser.h \
$(NULL)
ACLK_FILES = \
aclk/aclk_common.c \
aclk/aclk_common.h \
@@ -597,6 +605,7 @@ NETDATA_FILES = \
$(STATSD_PLUGIN_FILES) \
$(WEB_PLUGIN_FILES) \
$(CLAIM_FILES) \
$(PARSER_FILES) \
$(ACLK_FILES) \
$(SPAWN_PLUGIN_FILES) \
$(NULL)

View File

@@ -1,22 +1,22 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "plugins_d.h"
#include "pluginsd_parser.h"
char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL };
struct plugind *pluginsd_root = NULL;
static inline int pluginsd_space(char c)
{
switch (c) {
case ' ':
case '\t':
case '\r':
case '\n':
case '=':
return 1;
inline int pluginsd_space(char c) {
switch(c) {
case ' ':
case '\t':
case '\r':
case '\n':
case '=':
return 1;
default:
return 0;
default:
return 0;
}
}
@@ -36,10 +36,11 @@ inline int config_isspace(char c)
}
// split a text into words, respecting quotes
static inline int quoted_strings_splitter(char *str, char **words, int max_words, int (*custom_isspace)(char))
static inline int quoted_strings_splitter(char *str, char **words, int max_words, int (*custom_isspace)(char), char *recover_input, char **recover_location, int max_recover)
{
char *s = str, quote = 0;
int i = 0, j;
int i = 0, j, rec = 0;
char *recover = recover_input;
// skip all white space
while (unlikely(custom_isspace(*s)))
@@ -65,6 +66,10 @@ static inline int quoted_strings_splitter(char *str, char **words, int max_words
// if it is quote
else if (unlikely(*s == quote)) {
quote = 0;
if (recover && rec < max_recover) {
recover_location[rec++] = s;
*recover++ = *s;
}
*s = ' ';
continue;
}
@@ -72,6 +77,12 @@ static inline int quoted_strings_splitter(char *str, char **words, int max_words
// if it is a space
else if (unlikely(quote == 0 && custom_isspace(*s))) {
// terminate the word
if (recover && rec < max_recover) {
if (!rec || (rec && recover_location[rec-1] != s)) {
recover_location[rec++] = s;
*recover++ = *s;
}
}
*s++ = '\0';
// skip all white space
@@ -120,12 +131,12 @@ inline int pluginsd_initialize_plugin_directories()
}
// Parse it and store it to plugin directories
return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace);
return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0);
}
inline int pluginsd_split_words(char *str, char **words, int max_words)
inline int pluginsd_split_words(char *str, char **words, int max_words, char *recover_input, char **recover_location, int max_recover)
{
return quoted_strings_splitter(str, words, max_words, pluginsd_space);
return quoted_strings_splitter(str, words, max_words, pluginsd_space, recover_input, recover_location, max_recover);
}
#ifdef ENABLE_HTTPS
@@ -226,482 +237,6 @@ char *pluginsd_get_from_buffer(char *output, int *bytesread, char *input, SSL *s
}
#endif
inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations)
{
int enabled = cd->enabled;
if (!fp || !enabled) {
cd->enabled = 0;
return 0;
}
size_t count = 0;
char line[PLUGINSD_LINE_MAX + 1];
char *words[PLUGINSD_MAX_WORDS] = { NULL };
uint32_t BEGIN_HASH = simple_hash(PLUGINSD_KEYWORD_BEGIN);
uint32_t END_HASH = simple_hash(PLUGINSD_KEYWORD_END);
uint32_t FLUSH_HASH = simple_hash(PLUGINSD_KEYWORD_FLUSH);
uint32_t CHART_HASH = simple_hash(PLUGINSD_KEYWORD_CHART);
uint32_t DIMENSION_HASH = simple_hash(PLUGINSD_KEYWORD_DIMENSION);
uint32_t DISABLE_HASH = simple_hash(PLUGINSD_KEYWORD_DISABLE);
uint32_t VARIABLE_HASH = simple_hash(PLUGINSD_KEYWORD_VARIABLE);
uint32_t LABEL_HASH = simple_hash(PLUGINSD_KEYWORD_LABEL);
uint32_t OVERWRITE_HASH = simple_hash(PLUGINSD_KEYWORD_OVERWRITE);
RRDSET *st = NULL;
uint32_t hash;
struct label *new_labels = NULL;
errno = 0;
clearerr(fp);
if (unlikely(fileno(fp) == -1)) {
error("file descriptor given is not a valid stream");
goto cleanup;
}
#ifdef ENABLE_HTTPS
int bytesleft = 0;
char tmpbuffer[PLUGINSD_LINE_MAX];
char *readfrom = NULL;
#endif
char *r = NULL;
while (!ferror(fp)) {
if (unlikely(netdata_exit))
break;
#ifdef ENABLE_HTTPS
int normalread = 1;
if (netdata_srv_ctx) {
if (host->stream_ssl.conn && !host->stream_ssl.flags) {
if (!bytesleft) {
r = line;
readfrom = tmpbuffer;
bytesleft = pluginsd_update_buffer(readfrom, host->stream_ssl.conn);
if (bytesleft <= 0) {
break;
}
}
readfrom = pluginsd_get_from_buffer(line, &bytesleft, readfrom, host->stream_ssl.conn, tmpbuffer);
if (!readfrom) {
r = NULL;
}
normalread = 0;
}
}
if (normalread) {
r = fgets(line, PLUGINSD_LINE_MAX, fp);
}
#else
r = fgets(line, PLUGINSD_LINE_MAX, fp);
#endif
if (unlikely(!r)) {
if (feof(fp))
error("read failed: end of file");
else if (ferror(fp))
error("read failed: input error");
else
error("read failed: unknown error");
break;
}
if (unlikely(netdata_exit))
break;
line[PLUGINSD_LINE_MAX] = '\0';
int w = pluginsd_split_words(line, words, PLUGINSD_MAX_WORDS);
char *s = words[0];
if (unlikely(!s || !*s || !w)) {
continue;
}
// debug(D_PLUGINSD, "PLUGINSD: words 0='%s' 1='%s' 2='%s' 3='%s' 4='%s' 5='%s' 6='%s' 7='%s' 8='%s' 9='%s'", words[0], words[1], words[2], words[3], words[4], words[5], words[6], words[7], words[8], words[9]);
if (likely(!simple_hash_strcmp(s, "SET", &hash))) {
char *dimension = words[1];
char *value = words[2];
if (unlikely(!dimension || !*dimension)) {
error(
"requested a SET on chart '%s' of host '%s', without a dimension. Disabling it.", st->id,
host->hostname);
enabled = 0;
break;
}
if (unlikely(!value || !*value))
value = NULL;
if (unlikely(!st)) {
error(
"requested a SET on dimension %s with value %s on host '%s', without a BEGIN. Disabling it.",
dimension, value ? value : "<nothing>", host->hostname);
enabled = 0;
break;
}
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_PLUGINSD, "is setting dimension %s/%s to %s", st->id, dimension, value ? value : "<nothing>");
if (value) {
RRDDIM *rd = rrddim_find(st, dimension);
if (unlikely(!rd)) {
error(
"requested a SET to dimension with id '%s' on stats '%s' (%s) on host '%s', which does not exist. Disabling it.",
dimension, st->name, st->id, st->rrdhost->hostname);
enabled = 0;
break;
} else
rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
}
} else if (likely(hash == BEGIN_HASH && !strcmp(s, PLUGINSD_KEYWORD_BEGIN))) {
char *id = words[1];
char *microseconds_txt = words[2];
if (unlikely(!id)) {
error("requested a BEGIN without a chart id for host '%s'. Disabling it.", host->hostname);
enabled = 0;
break;
}
st = rrdset_find(host, id);
if (unlikely(!st)) {
error(
"requested a BEGIN on chart '%s', which does not exist on host '%s'. Disabling it.", id,
host->hostname);
enabled = 0;
break;
}
if (likely(st->counter_done)) {
usec_t microseconds = 0;
if (microseconds_txt && *microseconds_txt)
microseconds = str2ull(microseconds_txt);
if (likely(microseconds)) {
if (trust_durations)
rrdset_next_usec_unfiltered(st, microseconds);
else
rrdset_next_usec(st, microseconds);
} else
rrdset_next(st);
}
} else if (likely(hash == END_HASH && !strcmp(s, PLUGINSD_KEYWORD_END))) {
if (unlikely(!st)) {
error("requested an END, without a BEGIN on host '%s'. Disabling it.", host->hostname);
enabled = 0;
break;
}
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_PLUGINSD, "requested an END on chart %s", st->id);
rrdset_done(st);
st = NULL;
count++;
} else if (likely(hash == CHART_HASH && !strcmp(s, PLUGINSD_KEYWORD_CHART))) {
st = NULL;
char *type = words[1];
char *name = words[2];
char *title = words[3];
char *units = words[4];
char *family = words[5];
char *context = words[6];
char *chart = words[7];
char *priority_s = words[8];
char *update_every_s = words[9];
char *options = words[10];
char *plugin = words[11];
char *module = words[12];
// parse the id from type
char *id = NULL;
if (likely(type && (id = strchr(type, '.')))) {
*id = '\0';
id++;
}
// make sure we have the required variables
if (unlikely(!type || !*type || !id || !*id)) {
error("requested a CHART, without a type.id, on host '%s'. Disabling it.", host->hostname);
enabled = 0;
break;
}
// parse the name, and make sure it does not include 'type.'
if (unlikely(name && *name)) {
// when data are coming from slaves
// name will be type.name
// so we have to remove 'type.' from name too
size_t len = strlen(type);
if (strncmp(type, name, len) == 0 && name[len] == '.')
name = &name[len + 1];
// if the name is the same with the id,
// or is just 'NULL', clear it.
if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
name = NULL;
}
int priority = 1000;
if (likely(priority_s && *priority_s))
priority = str2i(priority_s);
int update_every = cd->update_every;
if (likely(update_every_s && *update_every_s))
update_every = str2i(update_every_s);
if (unlikely(!update_every))
update_every = cd->update_every;
RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
if (unlikely(chart))
chart_type = rrdset_type_id(chart);
if (unlikely(name && !*name))
name = NULL;
if (unlikely(family && !*family))
family = NULL;
if (unlikely(context && !*context))
context = NULL;
if (unlikely(!title))
title = "";
if (unlikely(!units))
units = "unknown";
debug(
D_PLUGINSD,
"creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
priority, update_every);
st = rrdset_create(
host, type, id, name, family, context, title, units, (plugin && *plugin) ? plugin : cd->filename,
module, priority, update_every, chart_type);
if (options && *options) {
if (strstr(options, "obsolete"))
rrdset_is_obsolete(st);
else
rrdset_isnot_obsolete(st);
if (strstr(options, "detail"))
rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
else
rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
if (strstr(options, "hidden"))
rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
else
rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
if (strstr(options, "store_first"))
rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
else
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
} else {
rrdset_isnot_obsolete(st);
rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
} else if (likely(hash == DIMENSION_HASH && !strcmp(s, PLUGINSD_KEYWORD_DIMENSION))) {
char *id = words[1];
char *name = words[2];
char *algorithm = words[3];
char *multiplier_s = words[4];
char *divisor_s = words[5];
char *options = words[6];
if (unlikely(!id || !*id)) {
error(
"requested a DIMENSION, without an id, host '%s' and chart '%s'. Disabling it.", host->hostname,
st ? st->id : "UNSET");
enabled = 0;
break;
}
if (unlikely(!st)) {
error("requested a DIMENSION, without a CHART, on host '%s'. Disabling it.", host->hostname);
enabled = 0;
break;
}
long multiplier = 1;
if (multiplier_s && *multiplier_s)
multiplier = strtol(multiplier_s, NULL, 0);
if (unlikely(!multiplier))
multiplier = 1;
long divisor = 1;
if (likely(divisor_s && *divisor_s))
divisor = strtol(divisor_s, NULL, 0);
if (unlikely(!divisor))
divisor = 1;
if (unlikely(!algorithm || !*algorithm))
algorithm = "absolute";
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(
D_PLUGINSD,
"creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
st->id, id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
options ? options : "");
RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN);
rrddim_flag_clear(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
if (options && *options) {
if (strstr(options, "obsolete") != NULL)
rrddim_is_obsolete(st, rd);
else
rrddim_isnot_obsolete(st, rd);
if (strstr(options, "hidden") != NULL)
rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN);
if (strstr(options, "noreset") != NULL)
rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
if (strstr(options, "nooverflow") != NULL)
rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
} else {
rrddim_isnot_obsolete(st, rd);
}
} else if (likely(hash == VARIABLE_HASH && !strcmp(s, PLUGINSD_KEYWORD_VARIABLE))) {
char *name = words[1];
char *value = words[2];
int global = (st) ? 0 : 1;
if (name && *name) {
if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
global = 1;
name = words[2];
value = words[3];
} else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
global = 0;
name = words[2];
value = words[3];
}
}
if (unlikely(!name || !*name)) {
error("requested a VARIABLE on host '%s', without a variable name. Disabling it.", host->hostname);
enabled = 0;
break;
}
if (unlikely(!value || !*value))
value = NULL;
if (value) {
char *endptr = NULL;
calculated_number v = (calculated_number)str2ld(value, &endptr);
if (unlikely(endptr && *endptr)) {
if (endptr == value)
error(
"the value '%s' of VARIABLE '%s' on host '%s' cannot be parsed as a number", value, name,
host->hostname);
else
error(
"the value '%s' of VARIABLE '%s' on host '%s' has leftovers: '%s'", value, name,
host->hostname, endptr);
}
if (global) {
RRDVAR *rv = rrdvar_custom_host_variable_create(host, name);
if (rv)
rrdvar_custom_host_variable_set(host, rv, v);
else
error("cannot find/create HOST VARIABLE '%s' on host '%s'", name, host->hostname);
} else if (st) {
RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(st, name);
if (rs)
rrdsetvar_custom_chart_variable_set(rs, v);
else
error(
"cannot find/create CHART VARIABLE '%s' on host '%s', chart '%s'", name, host->hostname,
st->id);
} else
error("cannot find/create CHART VARIABLE '%s' on host '%s' without a chart", name, host->hostname);
} else
error(
"cannot set %s VARIABLE '%s' on host '%s' to an empty value", (global) ? "HOST" : "CHART", name,
host->hostname);
} else if (likely(hash == FLUSH_HASH && !strcmp(s, PLUGINSD_KEYWORD_FLUSH))) {
debug(D_PLUGINSD, "requested a FLUSH");
st = NULL;
} else if (unlikely(hash == DISABLE_HASH && !strcmp(s, PLUGINSD_KEYWORD_DISABLE))) {
info("called DISABLE. Disabling it.");
enabled = 0;
break;
} else if (likely(hash == LABEL_HASH && !strcmp(s, PLUGINSD_KEYWORD_LABEL))) {
debug(D_PLUGINSD, "requested a LABEL CHANGE");
char *store;
if (!words[4])
store = words[3];
else {
store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char));
size_t remaining = PLUGINSD_LINE_MAX;
char *move = store;
int i = 3;
while (i < w) {
size_t length = strlen(words[i]);
if ((length + 1) >= remaining)
break;
remaining -= (length + 1);
memcpy(move, words[i], length);
move += length;
*move++ = ' ';
i++;
if (!words[i])
break;
}
}
new_labels = add_label_to_list(new_labels, words[1], store, strtol(words[2], NULL, 10));
if (store != words[3])
freez(store);
} else if (likely(hash == OVERWRITE_HASH && !strcmp(s, PLUGINSD_KEYWORD_OVERWRITE))) {
debug(D_PLUGINSD, "requested a OVERWITE a variable");
if (!host->labels) {
host->labels = new_labels;
} else {
rrdhost_rdlock(host);
replace_label_list(host, new_labels);
rrdhost_unlock(host);
}
new_labels = NULL;
} else {
error("sent command '%s' which is not known by netdata, for host '%s'. Disabling it.", s, host->hostname);
enabled = 0;
break;
}
}
cleanup:
cd->enabled = enabled;
if (new_labels)
free_host_labels(new_labels);
if (likely(count)) {
cd->successful_collections += count;
cd->serial_failures = 0;
} else
cd->serial_failures++;
return count;
}
static void pluginsd_worker_thread_cleanup(void *arg)
{
@@ -809,9 +344,7 @@ void *pluginsd_worker_thread(void *arg)
info("connected to '%s' running on pid %d", cd->fullfilename, cd->pid);
count = pluginsd_process(localhost, cd, fp, 0);
error(
"'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->pid,
count);
error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->pid, count);
killpid(cd->pid);
int worker_ret_code = mypclose(fp, cd->pid);

View File

@@ -31,6 +31,11 @@
#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE"
#define PLUGINSD_KEYWORD_LABEL "LABEL"
#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE"
#define PLUGINSD_KEYWORD_CONTEXT "CONTEXT"
#define PLUGINSD_KEYWORD_GUID "GUID"
#define PLUGINSD_KEYWORD_HOST "HOST"
#define PLUGINSD_KEYWORD_TOMBSTONE "TOMBSTONE"
#define PLUGINSD_LINE_MAX 1024
#define PLUGINSD_LINE_MAX_SSL_READ 512
@@ -60,7 +65,7 @@ struct plugind {
volatile sig_atomic_t enabled; // if this is enabled or not
time_t started_t;
uint32_t version;
struct plugind *next;
};
@@ -69,10 +74,13 @@ extern struct plugind *pluginsd_root;
extern void *pluginsd_main(void *ptr);
extern size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations);
extern int pluginsd_split_words(char *str, char **words, int max_words);
extern int pluginsd_split_words(char *str, char **words, int max_words, char *recover_string, char **recover_location, int max_recover);
extern int pluginsd_initialize_plugin_directories();
extern int config_isspace(char c);
extern int pluginsd_space(char c);
extern int pluginsd_update_buffer(char *output, SSL *ssl);
extern char * pluginsd_get_from_buffer(char *output, int *bytesread, char *input, SSL *ssl, char *src);
#endif /* NETDATA_PLUGINS_D_H */

View File

@@ -0,0 +1,630 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "pluginsd_parser.h"
/*
* This is the action defined for the FLUSH command
*/
PARSER_RC pluginsd_set_action(void *user, RRDSET *st, RRDDIM *rd, long long int value)
{
UNUSED(user);
rrddim_set_by_pointer(st, rd, value);
return PARSER_RC_OK;
}
PARSER_RC pluginsd_flush_action(void *user, RRDSET *st)
{
UNUSED(user);
UNUSED(st);
return PARSER_RC_OK;
}
PARSER_RC pluginsd_begin_action(void *user, RRDSET *st, usec_t microseconds, int trust_durations)
{
UNUSED(user);
if (likely(st->counter_done)) {
if (likely(microseconds)) {
if (trust_durations)
rrdset_next_usec_unfiltered(st, microseconds);
else
rrdset_next_usec(st, microseconds);
} else
rrdset_next(st);
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_end_action(void *user, RRDSET *st)
{
UNUSED(user);
rrdset_done(st);
return PARSER_RC_OK;
}
PARSER_RC pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, char *title, char *units, char *plugin,
char *module, int priority, int update_every, RRDSET_TYPE chart_type, char *options)
{
RRDSET *st = NULL;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
st = rrdset_create(
host, type, id, name, family, context, title, units,
plugin, module, priority, update_every,
chart_type);
if (options && *options) {
if (strstr(options, "obsolete"))
rrdset_is_obsolete(st);
else
rrdset_isnot_obsolete(st);
if (strstr(options, "detail"))
rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
else
rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
if (strstr(options, "hidden"))
rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
else
rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
if (strstr(options, "store_first"))
rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
else
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
} else {
rrdset_isnot_obsolete(st);
rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
((PARSER_USER_OBJECT *)user)->st = st;
return PARSER_RC_OK;
}
PARSER_RC pluginsd_disable_action(void *user)
{
UNUSED(user);
info("called DISABLE. Disabling it.");
((PARSER_USER_OBJECT *) user)->enabled = 0;
return PARSER_RC_ERROR;
}
PARSER_RC pluginsd_variable_action(void *user, RRDHOST *host, RRDSET *st, char *name, int global, calculated_number value)
{
UNUSED(user);
if (global) {
RRDVAR *rv = rrdvar_custom_host_variable_create(host, name);
if (rv)
rrdvar_custom_host_variable_set(host, rv, value);
else
error("cannot find/create HOST VARIABLE '%s' on host '%s'", name, host->hostname);
} else {
RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(st, name);
if (rs)
rrdsetvar_custom_chart_variable_set(rs, value);
else
error("cannot find/create CHART VARIABLE '%s' on host '%s', chart '%s'", name, host->hostname, st->id);
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, long multiplier, long divisor, char *options,
RRD_ALGORITHM algorithm_type)
{
UNUSED(user);
UNUSED(algorithm);
RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, algorithm_type);
rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN);
rrddim_flag_clear(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
if (options && *options) {
if (strstr(options, "obsolete") != NULL)
rrddim_is_obsolete(st, rd);
else
rrddim_isnot_obsolete(st, rd);
if (strstr(options, "hidden") != NULL)
rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN);
if (strstr(options, "noreset") != NULL)
rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
if (strstr(options, "nooverflow") != NULL)
rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
} else {
rrddim_isnot_obsolete(st, rd);
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_label_action(void *user, char *key, char *value, LABEL_SOURCE source)
{
((PARSER_USER_OBJECT *) user)->new_labels = add_label_to_list(((PARSER_USER_OBJECT *) user)->new_labels, key, value, source);
return PARSER_RC_OK;
}
PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, struct label *new_labels)
{
UNUSED(user);
if (!host->labels) {
host->labels = new_labels;
} else {
rrdhost_rdlock(host);
replace_label_list(host, new_labels);
rrdhost_unlock(host);
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
char *dimension = words[1];
char *value = words[2];
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!dimension || !*dimension)) {
error("requested a SET on chart '%s' of host '%s', without a dimension. Disabling it.", st->id, host->hostname);
goto disable;
}
if (unlikely(!value || !*value))
value = NULL;
if (unlikely(!st)) {
error(
"requested a SET on dimension %s with value %s on host '%s', without a BEGIN. Disabling it.", dimension,
value ? value : "<nothing>", host->hostname);
goto disable;
}
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_PLUGINSD, "is setting dimension %s/%s to %s", st->id, dimension, value ? value : "<nothing>");
if (value) {
RRDDIM *rd = rrddim_find(st, dimension);
if (unlikely(!rd)) {
error(
"requested a SET to dimension with id '%s' on stats '%s' (%s) on host '%s', which does not exist. Disabling it.",
dimension, st->name, st->id, st->rrdhost->hostname);
goto disable;
} else {
if (plugins_action->set_action) {
return plugins_action->set_action(
user, st, rd, strtoll(value, NULL, 0));
}
}
}
return PARSER_RC_OK;
disable:
((PARSER_USER_OBJECT *) user)->enabled = 0;
return PARSER_RC_ERROR;
}
PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
char *id = words[1];
char *microseconds_txt = words[2];
RRDSET *st = NULL;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
if (unlikely(!id)) {
error("requested a BEGIN without a chart id for host '%s'. Disabling it.", host->hostname);
goto disable;
}
st = rrdset_find(host, id);
if (unlikely(!st)) {
error("requested a BEGIN on chart '%s', which does not exist on host '%s'. Disabling it.", id, host->hostname);
goto disable;
}
((PARSER_USER_OBJECT *)user)->st = st;
usec_t microseconds = 0;
if (microseconds_txt && *microseconds_txt)
microseconds = str2ull(microseconds_txt);
if (plugins_action->begin_action) {
return plugins_action->begin_action(user, st, microseconds, ((PARSER_USER_OBJECT *)user)->trust_durations);
}
return PARSER_RC_OK;
disable:
((PARSER_USER_OBJECT *)user)->enabled = 0;
return PARSER_RC_ERROR;
}
PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
UNUSED(words);
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!st)) {
error("requested an END, without a BEGIN on host '%s'. Disabling it.", host->hostname);
((PARSER_USER_OBJECT *) user)->enabled = 0;
return PARSER_RC_ERROR;
}
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_PLUGINSD, "requested an END on chart %s", st->id);
((PARSER_USER_OBJECT *) user)->st = NULL;
((PARSER_USER_OBJECT *) user)->count++;
if (plugins_action->end_action) {
return plugins_action->end_action(user, st);
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
char *type = words[1];
char *name = words[2];
char *title = words[3];
char *units = words[4];
char *family = words[5];
char *context = words[6];
char *chart = words[7];
char *priority_s = words[8];
char *update_every_s = words[9];
char *options = words[10];
char *plugin = words[11];
char *module = words[12];
int have_action = ((plugins_action->chart_action) != NULL);
// parse the id from type
char *id = NULL;
if (likely(type && (id = strchr(type, '.')))) {
*id = '\0';
id++;
}
// make sure we have the required variables
if (unlikely((!type || !*type || !id || !*id) && !have_action)) {
error("requested a CHART, without a type.id, on host '%s'. Disabling it.", host->hostname);
((PARSER_USER_OBJECT *) user)->enabled = 0;
return PARSER_RC_ERROR;
}
// parse the name, and make sure it does not include 'type.'
if (unlikely(name && *name)) {
// when data are coming from slaves
// name will be type.name
// so we have to remove 'type.' from name too
size_t len = strlen(type);
if (strncmp(type, name, len) == 0 && name[len] == '.')
name = &name[len + 1];
// if the name is the same with the id,
// or is just 'NULL', clear it.
if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
name = NULL;
}
int priority = 1000;
if (likely(priority_s && *priority_s))
priority = str2i(priority_s);
int update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
if (likely(update_every_s && *update_every_s))
update_every = str2i(update_every_s);
if (unlikely(!update_every))
update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
if (unlikely(chart))
chart_type = rrdset_type_id(chart);
if (unlikely(name && !*name))
name = NULL;
if (unlikely(family && !*family))
family = NULL;
if (unlikely(context && !*context))
context = NULL;
if (unlikely(!title))
title = "";
if (unlikely(!units))
units = "unknown";
debug(
D_PLUGINSD,
"creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
priority, update_every);
if (have_action) {
return plugins_action->chart_action(
user, type, id, name, family, context, title, units,
(plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename, module, priority, update_every,
chart_type, options);
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
char *id = words[1];
char *name = words[2];
char *algorithm = words[3];
char *multiplier_s = words[4];
char *divisor_s = words[5];
char *options = words[6];
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!id)) {
error(
"requested a DIMENSION, without an id, host '%s' and chart '%s'. Disabling it.", host->hostname,
st ? st->id : "UNSET");
goto disable;
}
if (unlikely(!st)) {
error("requested a DIMENSION, without a CHART, on host '%s'. Disabling it.", host->hostname);
goto disable;
}
long multiplier = 1;
if (multiplier_s && *multiplier_s) {
multiplier = strtol(multiplier_s, NULL, 0);
if (unlikely(!multiplier))
multiplier = 1;
}
long divisor = 1;
if (likely(divisor_s && *divisor_s)) {
divisor = strtol(divisor_s, NULL, 0);
if (unlikely(!divisor))
divisor = 1;
}
if (unlikely(!algorithm || !*algorithm))
algorithm = "absolute";
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(
D_PLUGINSD,
"creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
st->id, id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
options ? options : "");
if (plugins_action->dimension_action) {
return plugins_action->dimension_action(
user, st, id, name, algorithm,
multiplier, divisor, (options && *options)?options:NULL, rrd_algorithm_id(algorithm));
}
return PARSER_RC_OK;
disable:
((PARSER_USER_OBJECT *)user)->enabled = 0;
return PARSER_RC_ERROR;
}
PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
char *name = words[1];
char *value = words[2];
calculated_number v;
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
int global = (st) ? 0 : 1;
if (name && *name) {
if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
global = 1;
name = words[2];
value = words[3];
} else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
global = 0;
name = words[2];
value = words[3];
}
}
if (unlikely(!name || !*name)) {
error("requested a VARIABLE on host '%s', without a variable name. Disabling it.", host->hostname);
((PARSER_USER_OBJECT *)user)->enabled = 0;
return PARSER_RC_ERROR;
}
if (unlikely(!value || !*value))
value = NULL;
if (unlikely(!value)) {
error("cannot set %s VARIABLE '%s' on host '%s' to an empty value", (global) ? "HOST" : "CHART", name,
host->hostname);
return PARSER_RC_OK;
}
if (!global && !st) {
error("cannot find/create CHART VARIABLE '%s' on host '%s' without a chart", name, host->hostname);
return PARSER_RC_OK;
}
char *endptr = NULL;
v = (calculated_number)str2ld(value, &endptr);
if (unlikely(endptr && *endptr)) {
if (endptr == value)
error(
"the value '%s' of VARIABLE '%s' on host '%s' cannot be parsed as a number", value, name,
host->hostname);
else
error(
"the value '%s' of VARIABLE '%s' on host '%s' has leftovers: '%s'", value, name, host->hostname,
endptr);
}
if (plugins_action->variable_action) {
return plugins_action->variable_action(user, host, st, name, global, v);
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
UNUSED(words);
debug(D_PLUGINSD, "requested a FLUSH");
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
((PARSER_USER_OBJECT *) user)->st = NULL;
if (plugins_action->flush_action) {
return plugins_action->flush_action(user, st);
}
return PARSER_RC_OK;
}
PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
UNUSED(user);
UNUSED(words);
if (plugins_action->disable_action) {
return plugins_action->disable_action(user);
}
return PARSER_RC_ERROR;
}
PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
char *store;
if (!words[4])
store = words[3];
else {
store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char));
size_t remaining = PLUGINSD_LINE_MAX;
char *move = store;
int i = 3;
while (i < PLUGINSD_MAX_WORDS) {
size_t length = strlen(words[i]);
if ((length + 1) >= remaining)
break;
remaining -= (length + 1);
memcpy(move, words[i], length);
move += length;
*move++ = ' ';
i++;
if (!words[i])
break;
}
}
if (plugins_action->label_action) {
PARSER_RC rc = plugins_action->label_action(user, words[1], store, strtol(words[2], NULL, 10));
if (store != words[3])
freez(store);
return rc;
}
if (store != words[3])
freez(store);
return PARSER_RC_OK;
}
PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
UNUSED(words);
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
debug(D_PLUGINSD, "requested a OVERWITE a variable");
struct label *new_labels = ((PARSER_USER_OBJECT *)user)->new_labels;
((PARSER_USER_OBJECT *)user)->new_labels = NULL;
if (plugins_action->overwrite_action) {
return plugins_action->overwrite_action(user, host, new_labels);
}
return PARSER_RC_OK;
}
// New plugins.d parser
inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations)
{
int enabled = cd->enabled;
if (!fp || !enabled) {
cd->enabled = 0;
return 0;
}
if (unlikely(fileno(fp) == -1)) {
error("file descriptor given is not a valid stream");
cd->serial_failures++;
return 0;
}
clearerr(fp);
PARSER_USER_OBJECT *user = callocz(1, sizeof(*user));
((PARSER_USER_OBJECT *) user)->enabled = cd->enabled;
((PARSER_USER_OBJECT *) user)->host = host;
((PARSER_USER_OBJECT *) user)->cd = cd;
((PARSER_USER_OBJECT *) user)->trust_durations = trust_durations;
PARSER *parser = parser_init(host, user, fp, PARSER_INPUT_SPLIT);
if (unlikely(!parser)) {
error("Failed to initialize parser");
cd->serial_failures++;
return 0;
}
parser->plugins_action->begin_action = &pluginsd_begin_action;
parser->plugins_action->flush_action = &pluginsd_flush_action;
parser->plugins_action->end_action = &pluginsd_end_action;
parser->plugins_action->disable_action = &pluginsd_disable_action;
parser->plugins_action->variable_action = &pluginsd_variable_action;
parser->plugins_action->dimension_action = &pluginsd_dimension_action;
parser->plugins_action->label_action = &pluginsd_label_action;
parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
parser->plugins_action->chart_action = &pluginsd_chart_action;
parser->plugins_action->set_action = &pluginsd_set_action;
user->parser = parser;
while (likely(!parser_next(parser))) {
if (unlikely(netdata_exit || parser_action(parser, NULL)))
break;
}
info("PARSER ended");
parser_destroy(parser);
cd->enabled = ((PARSER_USER_OBJECT *) user)->enabled;
size_t count = ((PARSER_USER_OBJECT *) user)->count;
freez(user);
if (likely(count)) {
cd->successful_collections += count;
cd->serial_failures = 0;
} else
cd->serial_failures++;
return count;
}

View File

@@ -0,0 +1,20 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_PLUGINSD_PARSER_H
#define NETDATA_PLUGINSD_PARSER_H
#include "../../parser/parser.h"
typedef struct parser_user_object {
PARSER *parser;
RRDSET *st;
RRDHOST *host;
struct plugind *cd;
int trust_durations;
struct label *new_labels;
size_t count;
int enabled;
} PARSER_USER_OBJECT;
#endif //NETDATA_PLUGINSD_PARSER_H

View File

@@ -1324,7 +1324,7 @@ static int statsd_readfile(const char *filename, STATSD_APP *app, STATSD_APP_CHA
else if (!strcmp(name, "dimension")) {
// metric [name [type [multiplier [divisor]]]]
char *words[10];
pluginsd_split_words(value, words, 10);
pluginsd_split_words(value, words, 10, NULL, NULL, 0);
int pattern = 0;
size_t i = 0;

View File

@@ -1491,6 +1491,7 @@ AC_CONFIG_FILES([
claim/Makefile
aclk/Makefile
spawn/Makefile
parser/Makefile
])
AC_OUTPUT

9
parser/Makefile.am Normal file
View File

@@ -0,0 +1,9 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
dist_noinst_DATA = \
README.md \
$(NULL)

147
parser/README.md Normal file
View File

@@ -0,0 +1,147 @@
#### Introduction
The parser will be used to process streaming and plugins input as well as metadata
Usage
1. Define a structure that will be used to share user state across calls
1. Initialize the parser using `parser_init`
2. Register keywords and assosiated callback function using `parser_add_keyword`
3. Register actions on the keywords
4. Start a loop until EOF
1. Fetch the next line using `parser_next`
2. Process the line using `parser_action`
1. The registered callbacks are executed to parse the input
2. The registered action for the callback is called for processing
4. Release the parser using `parser_destroy`
5. Release the user structure
#### Functions
----
##### parse_init(RRDHOST *host, void *user, void *input, int flags)
Initialize an internal parser with the specified user defined data structure that will be shared across calls.
Input
- Host
- The host this parser will be dealing with. For streaming with SSL enabled for this host
- user
- User defined structure that is passed in all the calls
- input
- Where the parser will get the input from
- flags
- flags to define processing on the input
Output
- A parser structure
----
##### parse_push(PARSER *parser, char *line)
Push a new line for processing
Input
- parser
- The parser object as returned by the `parser_init`
- line
- The new line to process
Output
- The line will be injected into the stream and will be the next one to be processed
Returns
- 0 line added
- 1 error detected
----
##### parse_add_keyword(PARSER *parser, char *keyword, keyword_function callback_function)
The function will add callbacks for keywords. The callback function is defined as
`typedef PARSER_RC (*keyword_function)(char **, void *);`
Input
- parser
- The parser object as returned by the `parser_init`
- keyword
- The keyword to register
- keyword_function
- The callback that will handle the keyword processing
* The callback function should return one of the following
* PARSER_RC_OK -- Callback was successful (continue with other callbacks)
* PARSER_RC_STOP -- Stop processing callbacks (return OK)
* PARSER_RC_ERROR -- Callback failed, exit
Output
- The correspoding keyword and callback will be registered
Returns
- 0 maximum callbacks already registered for this keyword
- > 0 which is the number of callbacks assosiated with this keyword.
----
##### parser_next(PARSER *parser)
Return the next item to parse
Input
- parser
- The parser object as returned by the `parser_init`
Output
- The parser will store internally the next item to parse
Returns
- 0 Next item fetched successfully
- 1 No more items to parse
----
##### parser_action(PARSER *parser, char *input)
Return the next item to parse
Input
- parser
- The parser object as returned by the `parser_init`
- input
- Process the input specified instead of using the internal buffer
Output
- The current keyword will be processed by calling all the registered callbacks
Returns
- 0 Callbacks called successfully
- 1 Failed
----
##### parser_destroy(PARSER *parser)
Cleanup a previously allocated parser
Input
- parser
- The parser object as returned by the `parser_init`
Output
- The parser is deallocated
Returns
- none
----
##### parser_recover_input(PARSER *parser)
Cleanup a previously allocated parser
Input
- parser
- The parser object as returned by the `parser_init`
Output
- The parser is deallocated
Returns
- none

352
parser/parser.c Normal file
View File

@@ -0,0 +1,352 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "parser.h"
static inline int find_keyword(char *str, char *keyword, int max_size, int (*custom_isspace)(char))
{
char *s = str, *keyword_start;
while (unlikely(custom_isspace(*s))) s++;
keyword_start = s;
while (likely(*s && !custom_isspace(*s)) && max_size > 0) {
*keyword++ = *s++;
max_size--;
}
*keyword = '\0';
return max_size == 0 ? 0 : (int) (s - keyword_start);
}
/*
* Initialize a parser
* user : as defined by the user, will be shared across calls
* input : main input stream (auto detect stream -- file, socket, pipe)
* buffer : This is the buffer to be used (if null a buffer of size will be allocated)
* size : buffer size either passed or will be allocated
* If the buffer is auto allocated, it will auto freed when the parser is destroyed
*
*
*/
PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags)
{
PARSER *parser;
parser = callocz(1, sizeof(*parser));
if (unlikely(!parser))
return NULL;
parser->plugins_action = callocz(1, sizeof(PLUGINSD_ACTION));
if (unlikely(!parser->plugins_action)) {
freez(parser);
return NULL;
}
parser->user = user;
parser->input = input;
parser->flags = flags;
parser->host = host;
#ifdef ENABLE_HTTPS
parser->bytesleft = 0;
parser->readfrom = NULL;
#endif
if (unlikely(!(flags & PARSER_NO_PARSE_INIT))) {
int rc = parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
rc += parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
rc += parser_add_keyword(parser, "SET", pluginsd_set);
}
return parser;
}
/*
* Push a new line into the parsing stream
*
* This line will be the next one to process ie the next fetch will get this one
*
*/
int parser_push(PARSER *parser, char *line)
{
PARSER_DATA *tmp_parser_data;
if (unlikely(!parser))
return 1;
if (unlikely(!line))
return 0;
tmp_parser_data = callocz(1, sizeof(*tmp_parser_data));
tmp_parser_data->line = strdupz(line);
tmp_parser_data->next = parser->data;
parser->data = tmp_parser_data;
return 0;
}
/*
* Add a keyword and the corresponding function that will be called
* Multiple functions may be added
* Input : keyword
* : callback function
* : flags
* Output: > 0 registered function number
* : 0 Error
*/
int parser_add_keyword(PARSER *parser, char *keyword, keyword_function func)
{
PARSER_KEYWORD *tmp_keyword;
if (strcmp(keyword, "_read") == 0) {
parser->read_function = (void *) func;
return 0;
}
if (strcmp(keyword, "_eof") == 0) {
parser->eof_function = (void *) func;
return 0;
}
if (strcmp(keyword, "_unknown") == 0) {
parser->unknown_function = (void *) func;
return 0;
}
uint32_t keyword_hash = simple_hash(keyword);
tmp_keyword = parser->keyword;
while (tmp_keyword) {
if (tmp_keyword->keyword_hash == keyword_hash && (!strcmp(tmp_keyword->keyword, keyword))) {
if (tmp_keyword->func_no == PARSER_MAX_CALLBACKS)
return 0;
tmp_keyword->func[tmp_keyword->func_no++] = (void *) func;
return tmp_keyword->func_no;
}
tmp_keyword = tmp_keyword->next;
}
tmp_keyword = callocz(1, sizeof(*tmp_keyword));
tmp_keyword->keyword = strdupz(keyword);
tmp_keyword->keyword_hash = keyword_hash;
tmp_keyword->func[tmp_keyword->func_no++] = (void *) func;
tmp_keyword->next = parser->keyword;
parser->keyword = tmp_keyword;
return tmp_keyword->func_no;
}
/*
* Cleanup a previously allocated parser
*/
void parser_destroy(PARSER *parser)
{
if (unlikely(!parser))
return;
PARSER_KEYWORD *tmp_keyword, *tmp_keyword_next;
PARSER_DATA *tmp_parser_data, *tmp_parser_data_next;
// Remove keywords
tmp_keyword = parser->keyword;
while (tmp_keyword) {
tmp_keyword_next = tmp_keyword->next;
freez(tmp_keyword->keyword);
freez(tmp_keyword);
tmp_keyword = tmp_keyword_next;
}
// Remove pushed data if any
tmp_parser_data = parser->data;
while (tmp_parser_data) {
tmp_parser_data_next = tmp_parser_data->next;
freez(tmp_parser_data->line);
freez(tmp_parser_data);
tmp_parser_data = tmp_parser_data_next;
}
freez(parser->plugins_action);
freez(parser);
return;
}
/*
* Fetch the next line to process
*
*/
int parser_next(PARSER *parser)
{
char *tmp = NULL;
if (unlikely(!parser))
return 1;
parser->flags &= ~(PARSER_INPUT_PROCESSED);
PARSER_DATA *tmp_parser_data = parser->data;
if (unlikely(tmp_parser_data)) {
strncpyz(parser->buffer, tmp_parser_data->line, PLUGINSD_LINE_MAX);
parser->data = tmp_parser_data->next;
freez(tmp_parser_data->line);
freez(tmp_parser_data);
return 0;
}
#ifdef ENABLE_HTTPS
int normalread = 1;
if (netdata_srv_ctx) {
if (parser->host->stream_ssl.conn && !parser->host->stream_ssl.flags) {
tmp = parser->buffer;
if (!parser->bytesleft) {
parser->readfrom = parser->tmpbuffer;
parser->bytesleft =
pluginsd_update_buffer(parser->readfrom, parser->host->stream_ssl.conn);
if (parser->bytesleft <= 0) {
return 1;
}
}
parser->readfrom = pluginsd_get_from_buffer(
parser->buffer, &parser->bytesleft, parser->readfrom,
parser->host->stream_ssl.conn, parser->tmpbuffer);
if (!parser->readfrom)
tmp = NULL;
normalread = 0;
}
}
if (normalread) {
if (unlikely(parser->read_function))
tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->input);
else
tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->input);
}
#else
if (unlikely(parser->read_function))
tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->input);
else
tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->input);
#endif
if (unlikely(!tmp)) {
if (unlikely(parser->eof_function)) {
int rc = parser->eof_function(parser->input);
error("read failed: user defined function returned %d", rc);
}
else {
if (feof((FILE *)parser->input))
error("read failed: end of file");
else if (ferror((FILE *)parser->input))
error("read failed: input error");
else
error("read failed: unknown error");
}
return 1;
}
return 0;
}
/*
* Takes an initialized parser object that has an unprocessed entry (by calling parser_next)
* and if it contains a valid keyword, it will execute all the callbacks
*
*/
inline int parser_action(PARSER *parser, char *input)
{
PARSER_RC rc = PARSER_RC_OK;
char *words[PLUGINSD_MAX_WORDS] = { NULL };
char command[PLUGINSD_LINE_MAX];
keyword_function action_function;
keyword_function *action_function_list = NULL;
if (unlikely(!parser))
return 1;
parser->recover_location[0] = 0x0;
// if not direct input check if we have reprocessed this
if (unlikely(!input && parser->flags & PARSER_INPUT_PROCESSED))
return 0;
PARSER_KEYWORD *tmp_keyword = parser->keyword;
if (unlikely(!tmp_keyword)) {
return 1;
}
if (unlikely(!input))
input = parser->buffer;
if (unlikely(!find_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space)))
return 1;
if ((parser->flags & PARSER_INPUT_ORIGINAL) == PARSER_INPUT_ORIGINAL)
pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, parser->recover_input, parser->recover_location, PARSER_MAX_RECOVER_KEYWORDS);
else
pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
uint32_t command_hash = simple_hash(command);
while(tmp_keyword) {
if (command_hash == tmp_keyword->keyword_hash &&
(!strcmp(command, tmp_keyword->keyword))) {
action_function_list = &tmp_keyword->func[0];
break;
}
tmp_keyword = tmp_keyword->next;
}
if (unlikely(!action_function_list)) {
if (unlikely(parser->unknown_function))
rc = parser->unknown_function(words, parser->user, NULL);
else
rc = PARSER_RC_ERROR;
#ifdef NETDATA_INTERNAL_CHECKS
error("Unknown keyword [%s]", words[0]);
#endif
}
else {
while ((action_function = *action_function_list) != NULL) {
rc = action_function(words, parser->user, parser->plugins_action);
if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP))
break;
action_function_list++;
}
}
if (likely(input == parser->buffer))
parser->flags |= PARSER_INPUT_PROCESSED;
return (rc == PARSER_RC_ERROR);
}
inline int parser_recover_input(PARSER *parser)
{
if (unlikely(!parser))
return 1;
for(int i=0; i < PARSER_MAX_RECOVER_KEYWORDS && parser->recover_location[i]; i++)
*(parser->recover_location[i]) = parser->recover_input[i];
parser->recover_location[0] = 0x0;
return 0;
}

105
parser/parser.h Normal file
View File

@@ -0,0 +1,105 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_INCREMENTAL_PARSER_H
#define NETDATA_INCREMENTAL_PARSER_H 1
#include "../daemon/common.h"
#define PARSER_MAX_CALLBACKS 20
#define PARSER_MAX_RECOVER_KEYWORDS 128
// PARSER return codes
typedef enum parser_rc {
PARSER_RC_OK, // Callback was successful, go on
PARSER_RC_STOP, // Callback says STOP
PARSER_RC_ERROR // Callback failed (abort rest of callbacks)
} PARSER_RC;
typedef struct pluginsd_action {
PARSER_RC (*set_action)(void *user, RRDSET *st, RRDDIM *rd, long long int value);
PARSER_RC (*begin_action)(void *user, RRDSET *st, usec_t microseconds, int trust_durations);
PARSER_RC (*end_action)(void *user, RRDSET *st);
PARSER_RC (*chart_action)
(void *user, char *type, char *id, char *name, char *family, char *context, char *title, char *units, char *plugin,
char *module, int priority, int update_every, RRDSET_TYPE chart_type, char *options);
PARSER_RC (*dimension_action)
(void *user, RRDSET *st, char *id, char *name, char *algorithm, long multiplier, long divisor, char *options,
RRD_ALGORITHM algorithm_type);
PARSER_RC (*flush_action)(void *user, RRDSET *st);
PARSER_RC (*disable_action)(void *user);
PARSER_RC (*variable_action)(void *user, RRDHOST *host, RRDSET *st, char *name, int global, calculated_number value);
PARSER_RC (*label_action)(void *user, char *key, char *value, LABEL_SOURCE source);
PARSER_RC (*overwrite_action)(void *user, RRDHOST *host, struct label *new_labels);
} PLUGINSD_ACTION;
typedef enum parser_input_type {
PARSER_INPUT_SPLIT = 1 << 1,
PARSER_INPUT_ORIGINAL = 1 << 2,
PARSER_INPUT_PROCESSED = 1 << 3,
PARSER_NO_PARSE_INIT = 1 << 4,
PARSER_NO_ACTION_INIT = 1 << 5,
} PARSER_INPUT_TYPE;
#define PARSER_INPUT_FULL (PARSER_INPUT_SPLIT|PARSER_INPUT_ORIGINAL)
typedef PARSER_RC (*keyword_function)(char **, void *, PLUGINSD_ACTION *plugins_action);
typedef struct parser_keyword {
char *keyword;
uint32_t keyword_hash;
int func_no;
keyword_function func[PARSER_MAX_CALLBACKS+1];
struct parser_keyword *next;
} PARSER_KEYWORD;
typedef struct parser_data {
char *line;
struct parser_data *next;
} PARSER_DATA;
typedef struct parser {
uint8_t version; // Parser version
RRDHOST *host;
void *input; // Input source e.g. stream
PARSER_DATA *data; // extra input
PARSER_KEYWORD *keyword; // List of parse keywords and functions
PLUGINSD_ACTION *plugins_action;
void *user; // User defined structure to hold extra state between calls
uint32_t flags;
char *(*read_function)(char *buffer, long unsigned int, void *input);
int (*eof_function)(void *input);
keyword_function unknown_function;
char buffer[PLUGINSD_LINE_MAX];
char *recover_location[PARSER_MAX_RECOVER_KEYWORDS+1];
char recover_input[PARSER_MAX_RECOVER_KEYWORDS];
#ifdef ENABLE_HTTPS
int bytesleft;
char tmpbuffer[PLUGINSD_LINE_MAX];
char *readfrom;
#endif
} PARSER;
PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags);
int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func);
int parser_next(PARSER *working_parser);
int parser_action(PARSER *working_parser, char *input);
int parser_push(PARSER *working_parser, char *line);
void parser_destroy(PARSER *working_parser);
int parser_recover_input(PARSER *working_parser);
extern size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations);
extern PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action);
extern PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action);
#endif

View File

@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
#include "../parser/parser.h"
/*
* rrdpush
@@ -1214,6 +1215,7 @@ static int rrdpush_receive(int fd
.obsolete = 0,
.started_t = now_realtime_sec(),
.next = NULL,
.version = 0,
};
// put the client IP and port into the buffers used by plugins.d
@@ -1289,6 +1291,7 @@ static int rrdpush_receive(int fd
info("STREAM %s [receive from [%s]:%s]: receiving metrics...", host->hostname, client_ip, client_port);
log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "CONNECTED");
cd.version = stream_version;
size_t count = pluginsd_process(host, &cd, fp, 1);
log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "DISCONNECTED");