1
0
mirror of https://github.com/netdata/netdata.git synced 2021-06-06 23:03:21 +03:00

Implement the main flow for the Exporting Engine (#7149)

* Add top level tests

* Add a skeleton for preparing buffers

* Initialize graphite instance

* Prepare buffers for all instances

* Add Grafite collected value formatter

* Add support for exporting.conf read and parsing

* - Use new exporting_config instead of netdata_config

* Implement Grafite worker

* Disable exporting engine compilation if libuv is not available

* Add mutex locks

- Configure connectors as connector_<type> in sections of exporting.conf

- Change exporting_select_type to check for connector_ fields

* - Override exporting_config structure if there no exporting.conf so that
  look ups don't fail and we maintain backwards compatibility

* Separate fixtures in unit tests

* Test exporting_discard_responce

* Test response receiving

* Test buffer sending

* Test simple connector worker

- Instance section has the format connector:instance_name
  e.g graphite:my_graphite_instance

- Connectors with : in their name e.g graphite:plaintext are reserved
  So graphite:plaintext is not accepted because it would activate an
  instance with name "plaintext"
  It should be graphite:plaintext:instance_name

* - Enable the add_connector_instance to cleanup the internal structure
  by passing NULL,not NULL arguments

* Implement configurable update interval

- Add additional check to verify instance uniqueness across connectors

* Add host and chart filters

* Add the value calculation over a database series

* Add the calculated over stored data graphite connector

* Add tests for graphite connector

* Add JSON connector

* Add tests for JSON formatting functions

* Add OpenTSDB connector

* Add tests for the OpenTSDB connector

* Add temporaty notes to the documentation
This commit is contained in:
Vladimir Kobal
2019-12-12 21:41:11 +02:00
committed by GitHub
parent 7278d5bcd9
commit 6f27081912
44 changed files with 3832 additions and 47 deletions

View File

@@ -31,6 +31,7 @@ IF("${CMAKE_BUILD_TYPE}" MATCHES "Debug")
set(CXX_FORMAT_SECURITY "-Werror=format-security")
set(CXX_STACK_PROTECTOR "-fstack-protector-all")
set(CXX_FLAGS_DEBUG "-O0")
set(CMAKE_C_STANDARD 99)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O1 -ggdb -Wall -Wextra -DNETDATA_INTERNAL_CHECKS=1 -DNETDATA_VERIFY_LOCKS=1 ${CXX_FORMAT_SIGNEDNESS} ${CXX_FORMAT_SECURITY} ${CXX_STACK_PROTECTOR} ${CXX_FLAGS_DEBUG}")
ELSE()
message(STATUS "building for: release")
@@ -600,6 +601,23 @@ set(BACKENDS_PLUGIN_FILES
backends/prometheus/backend_prometheus.h
)
set(EXPORTING_ENGINE_FILES
exporting/exporting_engine.c
exporting/exporting_engine.h
exporting/graphite/graphite.c
exporting/graphite/graphite.h
exporting/json/json.c
exporting/json/json.h
exporting/opentsdb/opentsdb.c
exporting/opentsdb/opentsdb.h
exporting/read_config.c
exporting/init_connectors.c
exporting/process_data.c
exporting/check_filters.c
exporting/send_data.c
exporting/send_internal_metrics.c
)
set(KINESIS_BACKEND_FILES
backends/aws_kinesis/aws_kinesis.c
backends/aws_kinesis/aws_kinesis.h
@@ -639,6 +657,7 @@ set(NETDATA_FILES
${DAEMON_FILES}
${API_PLUGIN_FILES}
${BACKENDS_PLUGIN_FILES}
${EXPORTING_ENGINE_FILES}
${CHECKS_PLUGIN_FILES}
${HEALTH_PLUGIN_FILES}
${IDLEJITTER_PLUGIN_FILES}
@@ -916,6 +935,48 @@ if(BUILD_TESTING)
target_link_libraries(storage_number_testdriver libnetdata ${NETDATA_COMMON_LIBRARIES} ${CMOCKA_LIBRARIES})
add_test(NAME test_storage_number COMMAND storage_number_testdriver)
set(EXPORTING_ENGINE_TEST_FILES
exporting/tests/test_exporting_engine.c
exporting/tests/test_exporting_engine.h
exporting/tests/exporting_fixtures.c
exporting/tests/exporting_doubles.c
exporting/tests/netdata_doubles.c
exporting/tests/system_doubles.c
)
set(TEST_NAME exporting_engine)
add_executable(${TEST_NAME}_testdriver ${EXPORTING_ENGINE_TEST_FILES} ${EXPORTING_ENGINE_FILES})
target_compile_options(
${TEST_NAME}_testdriver
PRIVATE
-DUNIT_TESTING
)
target_link_options(
${TEST_NAME}_testdriver
PRIVATE
-Wl,--wrap=read_exporting_config
-Wl,--wrap=init_connectors
-Wl,--wrap=mark_scheduled_instances
-Wl,--wrap=rrdhost_is_exportable
-Wl,--wrap=rrdset_is_exportable
-Wl,--wrap=exporting_calculate_value_from_stored_data
-Wl,--wrap=prepare_buffers
-Wl,--wrap=notify_workers
-Wl,--wrap=send_internal_metrics
-Wl,--wrap=now_realtime_sec
-Wl,--wrap=uv_thread_create
-Wl,--wrap=uv_mutex_lock
-Wl,--wrap=uv_mutex_unlock
-Wl,--wrap=uv_cond_signal
-Wl,--wrap=uv_cond_wait
-Wl,--wrap=strdupz
-Wl,--wrap=info_int
-Wl,--wrap=recv
-Wl,--wrap=send
-Wl,--wrap=connect_to_one_of
)
target_link_libraries(${TEST_NAME}_testdriver libnetdata ${NETDATA_COMMON_LIBRARIES} ${CMOCKA_LIBRARIES})
add_test(NAME test_${TEST_NAME} COMMAND ${TEST_NAME}_testdriver)
set(WEB_API_TEST_FILES
web/api/tests/web_api.c
web/server/web_client.c
@@ -969,11 +1030,11 @@ if(BUILD_TESTING)
set_target_properties(
str2ld_testdriver
storage_number_testdriver
exporting_engine_testdriver
web_api_testdriver
valid_urls_testdriver
PROPERTIES RUNTIME_OUTPUT_DIRECTORY tests
)
endif()
endif()

View File

@@ -93,6 +93,7 @@ SUBDIRS += \
collectors \
daemon \
database \
exporting \
health \
libnetdata \
registry \
@@ -453,6 +454,23 @@ BACKENDS_PLUGIN_FILES = \
backends/prometheus/backend_prometheus.h \
$(NULL)
EXPORTING_ENGINE_FILES = \
exporting/exporting_engine.c \
exporting/exporting_engine.h \
exporting/graphite/graphite.c \
exporting/graphite/graphite.h \
exporting/json/json.c \
exporting/json/json.h \
exporting/opentsdb/opentsdb.c \
exporting/opentsdb/opentsdb.h \
exporting/read_config.c \
exporting/init_connectors.c \
exporting/process_data.c \
exporting/check_filters.c \
exporting/send_data.c \
exporting/send_internal_metrics.c \
$(NULL)
KINESIS_BACKEND_FILES = \
backends/aws_kinesis/aws_kinesis.c \
backends/aws_kinesis/aws_kinesis.h \
@@ -527,6 +545,12 @@ if LINUX
endif
if ENABLE_EXPORTING
NETDATA_FILES += \
$(EXPORTING_ENGINE_FILES) \
$(NULL)
endif
NETDATA_COMMON_LIBS = \
$(OPTIONAL_MATH_LIBS) \
$(OPTIONAL_ZLIB_LIBS) \
@@ -666,6 +690,7 @@ if ENABLE_UNITTESTS
check_PROGRAMS = \
libnetdata/tests/str2ld_testdriver \
libnetdata/storage_number/tests/storage_number_testdriver \
exporting/tests/exporting_engine_testdriver \
web/api/tests/web_api_testdriver \
web/api/tests/valid_urls_testdriver \
$(NULL)
@@ -729,4 +754,46 @@ if ENABLE_UNITTESTS
$(LIBNETDATA_FILES) \
$(NULL)
libnetdata_storage_number_tests_storage_number_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS)
EXPORTING_ENGINE_TEST_FILES = \
exporting/tests/test_exporting_engine.c \
exporting/tests/test_exporting_engine.h \
exporting/tests/exporting_fixtures.c \
exporting/tests/exporting_doubles.c \
exporting/tests/netdata_doubles.c \
exporting/tests/system_doubles.c \
$(NULL)
exporting_tests_exporting_engine_testdriver_SOURCES = \
$(EXPORTING_ENGINE_TEST_FILES) \
$(EXPORTING_ENGINE_FILES) \
$(LIBNETDATA_FILES) \
$(NULL)
exporting_tests_exporting_engine_testdriver_CFLAGS = \
$(AM_CFLAGS) \
-DUNIT_TESTING \
$(NULL)
exporting_tests_exporting_engine_testdriver_LDFLAGS = \
-Wl,--wrap=read_exporting_config \
-Wl,--wrap=init_connectors \
-Wl,--wrap=mark_scheduled_instances \
-Wl,--wrap=rrdhost_is_exportable \
-Wl,--wrap=rrdset_is_exportable \
-Wl,--wrap=exporting_calculate_value_from_stored_data \
-Wl,--wrap=prepare_buffers \
-Wl,--wrap=notify_workers \
-Wl,--wrap=send_internal_metrics \
-Wl,--wrap=now_realtime_sec \
-Wl,--wrap=uv_thread_create \
-Wl,--wrap=uv_mutex_lock \
-Wl,--wrap=uv_mutex_unlock \
-Wl,--wrap=uv_cond_signal \
-Wl,--wrap=uv_cond_wait \
-Wl,--wrap=strdupz \
-Wl,--wrap=info_int \
-Wl,--wrap=recv \
-Wl,--wrap=send \
-Wl,--wrap=connect_to_one_of \
$(TEST_LDFLAGS) \
$(NULL)
exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS)
endif

View File

@@ -269,9 +269,9 @@ void backend_set_kinesis_variables(int *default_port,
#if HAVE_KINESIS
*brc = process_json_response;
if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
*brf = format_dimension_collected_json_plaintext;
*brf = backends_format_dimension_collected_json_plaintext;
else
*brf = format_dimension_stored_json_plaintext;
*brf = backends_format_dimension_stored_json_plaintext;
#endif
}
@@ -321,9 +321,9 @@ void backend_set_mongodb_variables(int *default_port,
#if HAVE_MONGOC
*brc = process_json_response;
if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
*brf = format_dimension_collected_json_plaintext;
*brf = backends_format_dimension_collected_json_plaintext;
else
*brf = format_dimension_stored_json_plaintext;
*brf = backends_format_dimension_stored_json_plaintext;
#endif
}
@@ -344,9 +344,9 @@ void backend_set_json_variables(int *default_port,
*brc = process_json_response;
if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
*brf = format_dimension_collected_json_plaintext;
*brf = backends_format_dimension_collected_json_plaintext;
else
*brf = format_dimension_stored_json_plaintext;
*brf = backends_format_dimension_stored_json_plaintext;
}
/**
@@ -366,9 +366,9 @@ void backend_set_opentsdb_http_variables(int *default_port,
*brc = process_opentsdb_response;
if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
*brf = format_dimension_collected_opentsdb_http;
*brf = backends_format_dimension_collected_opentsdb_http;
else
*brf = format_dimension_stored_opentsdb_http;
*brf = backends_format_dimension_stored_opentsdb_http;
}
@@ -389,9 +389,9 @@ void backend_set_opentsdb_telnet_variables(int *default_port,
*brc = process_opentsdb_response;
if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
*brf = format_dimension_collected_opentsdb_telnet;
*brf = backends_format_dimension_collected_opentsdb_telnet;
else
*brf = format_dimension_stored_opentsdb_telnet;
*brf = backends_format_dimension_stored_opentsdb_telnet;
}
/**
@@ -411,9 +411,9 @@ void backend_set_graphite_variables(int *default_port,
*brc = process_graphite_response;
if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
*brf = format_dimension_collected_graphite_plaintext;
*brf = backends_format_dimension_collected_graphite_plaintext;
else
*brf = format_dimension_stored_graphite_plaintext;
*brf = backends_format_dimension_stored_graphite_plaintext;
}
/**
@@ -624,6 +624,9 @@ void *backends_main(void *ptr) {
case BACKEND_TYPE_UNKNOWN: {
break;
}
default: {
break;
}
}
#if ENABLE_PROMETHEUS_REMOTE_WRITE

View File

@@ -23,9 +23,13 @@ typedef enum backend_types {
BACKEND_TYPE_JSON, // Stores the data using JSON.
BACKEND_TYPE_PROMETEUS, // The user selected to use Prometheus backend
BACKEND_TYPE_KINESIS, // Send message to AWS Kinesis
BACKEND_TYPE_MONGODB // Send data to MongoDB collection
BACKEND_TYPE_MONGODB, // Send data to MongoDB collection
BACKEND_TYPE_NUM // Number of backend types
} BACKEND_TYPE;
#ifdef ENABLE_EXPORTING
#include "exporting/exporting_engine.h"
#endif
typedef int (**backend_response_checker_t)(BUFFER *);
typedef int (**backend_request_formatter_t)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS);
@@ -38,6 +42,7 @@ extern BACKEND_OPTIONS global_backend_options;
extern const char *global_backend_prefix;
extern void *backends_main(void *ptr);
BACKEND_TYPE backend_select_type(const char *type);
extern BACKEND_OPTIONS backend_parse_data_source(const char *source, BACKEND_OPTIONS backend_options);

View File

@@ -6,7 +6,7 @@
// ----------------------------------------------------------------------------
// graphite backend
int format_dimension_collected_graphite_plaintext(
int backends_format_dimension_collected_graphite_plaintext(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -42,7 +42,7 @@ int format_dimension_collected_graphite_plaintext(
return 1;
}
int format_dimension_stored_graphite_plaintext(
int backends_format_dimension_stored_graphite_plaintext(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from

View File

@@ -6,7 +6,7 @@
#include "backends/backends.h"
extern int format_dimension_collected_graphite_plaintext(
extern int backends_format_dimension_collected_graphite_plaintext(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -18,7 +18,7 @@ extern int format_dimension_collected_graphite_plaintext(
, BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
);
extern int format_dimension_stored_graphite_plaintext(
extern int backends_format_dimension_stored_graphite_plaintext(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from

View File

@@ -6,7 +6,7 @@
// ----------------------------------------------------------------------------
// json backend
int format_dimension_collected_json_plaintext(
int backends_format_dimension_collected_json_plaintext(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -74,7 +74,7 @@ int format_dimension_collected_json_plaintext(
return 1;
}
int format_dimension_stored_json_plaintext(
int backends_format_dimension_stored_json_plaintext(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from

View File

@@ -5,7 +5,7 @@
#include "backends/backends.h"
extern int format_dimension_collected_json_plaintext(
extern int backends_format_dimension_collected_json_plaintext(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -17,7 +17,7 @@ extern int format_dimension_collected_json_plaintext(
, BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
);
extern int format_dimension_stored_json_plaintext(
extern int backends_format_dimension_stored_json_plaintext(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from

View File

@@ -6,7 +6,7 @@
// ----------------------------------------------------------------------------
// opentsdb backend
int format_dimension_collected_opentsdb_telnet(
int backends_format_dimension_collected_opentsdb_telnet(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -42,7 +42,7 @@ int format_dimension_collected_opentsdb_telnet(
return 1;
}
int format_dimension_stored_opentsdb_telnet(
int backends_format_dimension_stored_opentsdb_telnet(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -103,7 +103,7 @@ static inline void opentsdb_build_message(BUFFER *b, char *message, const char *
);
}
int format_dimension_collected_opentsdb_http(
int backends_format_dimension_collected_opentsdb_http(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -151,7 +151,7 @@ int format_dimension_collected_opentsdb_http(
return 1;
}
int format_dimension_stored_opentsdb_http(
int backends_format_dimension_stored_opentsdb_http(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from

View File

@@ -5,7 +5,7 @@
#include "backends/backends.h"
extern int format_dimension_collected_opentsdb_telnet(
extern int backends_format_dimension_collected_opentsdb_telnet(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -17,7 +17,7 @@ extern int format_dimension_collected_opentsdb_telnet(
, BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
);
extern int format_dimension_stored_opentsdb_telnet(
extern int backends_format_dimension_stored_opentsdb_telnet(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -31,7 +31,7 @@ extern int format_dimension_stored_opentsdb_telnet(
extern int process_opentsdb_response(BUFFER *b);
int format_dimension_collected_opentsdb_http(
int backends_format_dimension_collected_opentsdb_http(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from
@@ -43,7 +43,7 @@ int format_dimension_collected_opentsdb_http(
, BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
);
int format_dimension_stored_opentsdb_http(
int backends_format_dimension_stored_opentsdb_http(
BUFFER *b // the buffer to write data to
, const char *prefix // the prefix to use
, RRDHOST *host // the host this chart comes from

View File

@@ -36,6 +36,7 @@ m4_ifdef([AM_SILENT_RULES], [
])
AC_CANONICAL_HOST
AC_PROG_CC
AC_PROG_CC_C99
AM_PROG_CC_C_O
AC_PROG_CXX
AC_PROG_INSTALL
@@ -407,6 +408,20 @@ fi
AC_MSG_RESULT([${enable_https}])
AM_CONDITIONAL([ENABLE_HTTPS], [test "${enable_https}" = "yes"])
# -----------------------------------------------------------------------------
# Exporting engine
AC_MSG_CHECKING([if netdata exporting engine should be used])
if test "${UV_LIBS}"; then
enable_exporting_engine="yes"
AC_DEFINE([ENABLE_EXPORTING], [1], [netdata exporting engine usability])
OPTIONAL_UV_CFLAGS="${UV_CFLAGS}"
OPTIONAL_UV_LIBS="${UV_LIBS}"
else
enable_exporting_engine="no"
fi
AC_MSG_RESULT([${enable_exporting_engine}])
AM_CONDITIONAL([ENABLE_EXPORTING], [test "${enable_exporting_engine}" = "yes"])
# -----------------------------------------------------------------------------
# JSON-C
test "${enable_jsonc}" = "yes" -a -z "${JSONC_LIBS}" && \
@@ -1209,6 +1224,11 @@ AC_CONFIG_FILES([
database/Makefile
database/engine/Makefile
diagrams/Makefile
exporting/Makefile
exporting/graphite/Makefile
exporting/json/Makefile
exporting/opentsdb/Makefile
exporting/tests/Makefile
health/Makefile
health/notifications/Makefile
libnetdata/Makefile

View File

@@ -27,7 +27,6 @@
#define config_generate(buffer, only_changed) appconfig_generate(&netdata_config, buffer, only_changed)
// ----------------------------------------------------------------------------
// netdata include files

View File

@@ -75,6 +75,9 @@ struct netdata_static_thread static_threads[] = {
// common plugins for all systems
{"BACKENDS", NULL, NULL, 1, NULL, NULL, backends_main},
#ifdef ENABLE_EXPORTING
{"EXPORTING", NULL, NULL, 1, NULL, NULL, exporting_main},
#endif
{"WEB_SERVER[static1]", NULL, NULL, 0, NULL, NULL, socket_listen_main_static_threaded},
{"STREAM", NULL, NULL, 0, NULL, NULL, rrdpush_sender_thread},
@@ -1105,6 +1108,7 @@ int main(int argc, char **argv) {
if(!config_loaded)
load_netdata_conf(NULL, 0);
// ------------------------------------------------------------------------
// initialize netdata
{

View File

@@ -408,6 +408,7 @@ struct rrdset {
// it goes around in a round-robin fashion
RRDSET_FLAGS flags; // configuration flags
RRDSET_FLAGS *exporting_flags; // array of flags for exporting connector instances
int gap_when_lost_iterations_above; // after how many lost iterations a gap should be stored
// netdata will interpolate values for gaps lower than this
@@ -627,6 +628,7 @@ struct rrdhost {
const char *timezone; // the timezone of the host
RRDHOST_FLAGS flags; // flags about this RRDHOST
RRDHOST_FLAGS *exporting_flags; // array of flags for exporting connector instances
int rrd_update_every; // the update frequency of the host
long rrd_history_entries; // the number of history entries for the host's charts

View File

@@ -597,6 +597,8 @@ void rrdhost_free(RRDHOST *host) {
while(host->rrdset_root)
rrdset_free(host->rrdset_root);
freez(host->exporting_flags);
while(host->alarms)
rrdcalc_unlink_and_free(host, host->alarms);

View File

@@ -328,6 +328,8 @@ void rrdset_free(RRDSET *st) {
// ------------------------------------------------------------------------
// free its children structures
freez(st->exporting_flags);
while(st->variables) rrdsetvar_free(st->variables);
while(st->alarms) rrdsetcalc_unlink(st->alarms);
while(st->dimensions) rrddim_free(st, st->dimensions);

15
exporting/Makefile.am Normal file
View File

@@ -0,0 +1,15 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
SUBDIRS = \
tests \
graphite \
json \
opentsdb \
$(NULL)
dist_noinst_DATA = \
README.md \
$(NULL)

45
exporting/README.md Normal file
View File

@@ -0,0 +1,45 @@
# Exporting metrics to external databases (experimental)
The exporting engine is an update for the former [backends](../backends/). It's still work in progress. It has a
modular structure and supports metric exporting via multiple exporting connector instances at the same time. You can
have different update intervals and filters configured for every exporting connector instance. The exporting engine has
its own configuration file `exporting.conf`. Configuration is almost similar to [backends](../backends/#configuration).
The only difference is that the type of a connector should be specified in a section name before a colon and a name after
the colon. At the moment only four types of connectors are supported: `graphite`, `json`, `opentsdb`, `opentsdb:http`.
An example configuration:
```conf
[exporting:global]
enabled = yes
[graphite:my_instance1]
enabled = yes
destination = localhost:2003
data source = sum
update every = 5
send charts matching = system.load
[json:my_instance2]
enabled = yes
destination = localhost:5448
data source = as collected
update every = 2
send charts matching = system.active_processes
[opentsdb:my_instance3]
enabled = yes
destination = localhost:4242
data source = sum
update every = 10
send charts matching = system.cpu
[opentsdb:http:my_instance4]
enabled = yes
destination = localhost:4243
data source = average
update every = 3
send charts matching = system.active_processes
```
[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fexporting%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>)

78
exporting/check_filters.c Normal file
View File

@@ -0,0 +1,78 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "exporting_engine.h"
/**
* Check if the connector instance should export the host metrics
*
* @param instance an exporting connector instance.
* @param host a data collecting host.
* @return Returns 1 if the connector instance should export the host metrics
*/
int rrdhost_is_exportable(struct instance *instance, RRDHOST *host)
{
if (host->exporting_flags == NULL)
host->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t));
RRDHOST_FLAGS *flags = &host->exporting_flags[instance->index];
if (unlikely((*flags & (RRDHOST_FLAG_BACKEND_SEND | RRDHOST_FLAG_BACKEND_DONT_SEND)) == 0)) {
char *host_name = (host == localhost) ? "localhost" : host->hostname;
if (!instance->config.hosts_pattern || simple_pattern_matches(instance->config.hosts_pattern, host_name)) {
*flags |= RRDHOST_FLAG_BACKEND_SEND;
info("enabled exporting of host '%s' for instance '%s'", host_name, instance->config.name);
} else {
*flags |= RRDHOST_FLAG_BACKEND_DONT_SEND;
info("disabled exporting of host '%s' for instance '%s'", host_name, instance->config.name);
}
}
if (likely(*flags & RRDHOST_FLAG_BACKEND_SEND))
return 1;
else
return 0;
}
/**
* Check if the connector instance should export the chart
*
* @param instance an exporting connector instance.
* @param st a chart.
* @return Returns 1 if the connector instance should export the chart
*/
int rrdset_is_exportable(struct instance *instance, RRDSET *st)
{
RRDHOST *host = st->rrdhost;
if (st->exporting_flags == NULL)
st->exporting_flags = callocz(instance->connector->engine->instance_num, sizeof(size_t));
RRDSET_FLAGS *flags = &st->exporting_flags[instance->index];
if(unlikely(*flags & RRDSET_FLAG_BACKEND_IGNORE))
return 0;
if(unlikely(!(*flags & RRDSET_FLAG_BACKEND_SEND))) {
// we have not checked this chart
if(simple_pattern_matches(instance->config.charts_pattern, st->id) || simple_pattern_matches(instance->config.charts_pattern, st->name))
*flags |= RRDSET_FLAG_BACKEND_SEND;
else {
*flags |= RRDSET_FLAG_BACKEND_IGNORE;
debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is disabled for backends.", st->id, host->hostname);
return 0;
}
}
if(unlikely(!rrdset_is_available_for_backends(st))) {
debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is not available for backends.", st->id, host->hostname);
return 0;
}
if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE && !(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED))) {
debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s' because its memory mode is '%s' and the backend requires database access.", st->id, host->hostname, rrd_memory_mode_name(host->rrd_memory_mode));
return 0;
}
return 1;
}

View File

@@ -0,0 +1,60 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "exporting_engine.h"
/**
* Exporting engine main
*
* The main thread used to control the exporting engine.
*
* @param ptr a pointer to netdata_static_structure.
*
* @return It always returns NULL.
*/
void *exporting_main(void *ptr)
{
(void)ptr;
struct engine *engine = read_exporting_config();
if (!engine) {
info("EXPORTING: no exporting connectors configured");
return NULL;
}
if (init_connectors(engine) != 0) {
error("EXPORTING: cannot initialize exporting connectors");
return NULL;
}
usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
heartbeat_t hb;
heartbeat_init(&hb);
while (!netdata_exit) {
heartbeat_next(&hb, step_ut);
engine->now = now_realtime_sec();
if (mark_scheduled_instances(engine)) {
if (prepare_buffers(engine) != 0) {
error("EXPORTING: cannot prepare data to send");
return NULL;
}
}
if (notify_workers(engine) != 0) {
error("EXPORTING: cannot communicate with exporting connector instance working threads");
return NULL;
}
if (send_internal_metrics(engine) != 0) {
error("EXPORTING: cannot send metrics for the operation of exporting engine");
return NULL;
}
#ifdef UNIT_TESTING
break;
#endif
}
return NULL;
}

View File

@@ -0,0 +1,185 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_EXPORTING_ENGINE_H
#define NETDATA_EXPORTING_ENGINE_H 1
#include "daemon/common.h"
#include <uv.h>
#define exporter_get(section, name, value) expconfig_get(&exporting_config, section, name, value)
#define exporter_get_number(section, name, value) expconfig_get_number(&exporting_config, section, name, value)
#define exporter_get_boolean(section, name, value) expconfig_get_boolean(&exporting_config, section, name, value)
extern struct config exporting_config;
#define EXPORTER_DATA_SOURCE "data source"
#define EXPORTER_DATA_SOURCE_DEFAULT "average"
#define EXPORTER_DESTINATION "destination"
#define EXPORTER_DESTINATION_DEFAULT "localhost"
#define EXPORTER_UPDATE_EVERY "update every"
#define EXPORTER_UPDATE_EVERY_DEFAULT 10
#define EXPORTER_BUF_ONFAIL "buffer on failures"
#define EXPORTER_BUF_ONFAIL_DEFAULT 10
#define EXPORTER_TIMEOUT_MS "timeout ms"
#define EXPORTER_TIMEOUT_MS_DEFAULT 10000
#define EXPORTER_SEND_CHART_MATCH "send charts matching"
#define EXPORTER_SEND_CHART_MATCH_DEFAULT "*"
#define EXPORTER_SEND_HOST_MATCH "send hosts matching"
#define EXPORTER_SEND_HOST_MATCH_DEFAULT "localhost *"
#define EXPORTER_SEND_NAMES "send names instead of ids"
#define EXPORTER_SEND_NAMES_DEFAULT CONFIG_BOOLEAN_YES
typedef enum exporting_options {
EXPORTING_OPTION_NONE = 0,
EXPORTING_SOURCE_DATA_AS_COLLECTED = (1 << 0),
EXPORTING_SOURCE_DATA_AVERAGE = (1 << 1),
EXPORTING_SOURCE_DATA_SUM = (1 << 2),
EXPORTING_OPTION_SEND_NAMES = (1 << 16)
} EXPORTING_OPTIONS;
#define EXPORTING_OPTIONS_SOURCE_BITS \
(EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_SOURCE_DATA_SUM)
#define EXPORTING_OPTIONS_DATA_SOURCE(exporting_options) (exporting_options & EXPORTING_OPTIONS_SOURCE_BITS)
struct engine;
struct instance_config {
const char *name;
const char *destination;
int update_every;
int buffer_on_failures;
long timeoutms;
EXPORTING_OPTIONS options;
SIMPLE_PATTERN *charts_pattern;
SIMPLE_PATTERN *hosts_pattern;
void *connector_specific_config;
};
struct simple_connector_config {
int default_port;
};
struct connector_config {
BACKEND_TYPE type;
void *connector_specific_config;
};
struct engine_config {
const char *prefix;
const char *hostname;
int update_every;
};
struct stats {
collected_number chart_buffered_metrics;
collected_number chart_lost_metrics;
collected_number chart_sent_metrics;
collected_number chart_buffered_bytes;
collected_number chart_received_bytes;
collected_number chart_sent_bytes;
collected_number chart_receptions;
collected_number chart_transmission_successes;
collected_number chart_transmission_failures;
collected_number chart_data_lost_events;
collected_number chart_lost_bytes;
collected_number chart_reconnects;
};
struct instance {
struct instance_config config;
void *buffer;
struct stats stats;
int scheduled;
int skip_host;
int skip_chart;
time_t after;
time_t before;
uv_thread_t thread;
uv_mutex_t mutex;
uv_cond_t cond_var;
int (*start_batch_formatting)(struct instance *instance);
int (*start_host_formatting)(struct instance *instance, RRDHOST *host);
int (*start_chart_formatting)(struct instance *instance, RRDSET *st);
int (*metric_formatting)(struct instance *instance, RRDDIM *rd);
int (*end_chart_formatting)(struct instance *instance, RRDSET *st);
int (*end_host_formatting)(struct instance *instance, RRDHOST *host);
int (*end_batch_formatting)(struct instance *instance);
size_t index;
struct instance *next;
struct connector *connector;
};
struct connector {
struct connector_config config;
void (*worker)(void *instance_p);
struct instance *instance_root;
struct connector *next;
struct engine *engine;
};
struct engine {
struct engine_config config;
size_t instance_num;
time_t now;
struct connector *connector_root;
};
void *exporting_main(void *ptr);
struct engine *read_exporting_config();
BACKEND_TYPE exporting_select_type(const char *type);
int init_connectors(struct engine *engine);
int mark_scheduled_instances(struct engine *engine);
int prepare_buffers(struct engine *engine);
int notify_workers(struct engine *engine);
size_t exporting_name_copy(char *dst, const char *src, size_t max_len);
int rrdhost_is_exportable(struct instance *instance, RRDHOST *host);
int rrdset_is_exportable(struct instance *instance, RRDSET *st);
calculated_number exporting_calculate_value_from_stored_data(
struct instance *instance,
RRDDIM *rd,
time_t *last_timestamp);
int start_batch_formatting(struct engine *engine);
int start_host_formatting(struct engine *engine, RRDHOST *host);
int start_chart_formatting(struct engine *engine, RRDSET *st);
int metric_formatting(struct engine *engine, RRDDIM *rd);
int end_chart_formatting(struct engine *engine, RRDSET *st);
int end_host_formatting(struct engine *engine, RRDHOST *host);
int end_batch_formatting(struct engine *engine);
int exporting_discard_response(BUFFER *buffer, struct instance *instance);
void simple_connector_receive_response(int *sock, struct instance *instance);
void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance);
void simple_connector_worker(void *instance_p);
int send_internal_metrics(struct engine *engine);
#endif /* NETDATA_EXPORTING_ENGINE_H */

View File

@@ -0,0 +1,4 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in

View File

@@ -0,0 +1,138 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "graphite.h"
/**
* Initialize Graphite connector
*
* @param instance a connector data structure.
* @return Always returns 0.
*/
int init_graphite_connector(struct connector *connector)
{
connector->worker = simple_connector_worker;
struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config));
connector->config.connector_specific_config = (void *)connector_specific_config;
connector_specific_config->default_port = 2003;
return 0;
}
/**
* Initialize Graphite connector instance
*
* @param instance an instance data structure.
* @return Returns 0 on success, 1 on failure.
*/
int init_graphite_instance(struct instance *instance)
{
instance->start_batch_formatting = NULL;
instance->start_host_formatting = NULL;
instance->start_chart_formatting = NULL;
if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
instance->metric_formatting = format_dimension_collected_graphite_plaintext;
else
instance->metric_formatting = format_dimension_stored_graphite_plaintext;
instance->end_chart_formatting = NULL;
instance->end_host_formatting = NULL;
instance->end_batch_formatting = NULL;
instance->buffer = (void *)buffer_create(0);
if (!instance->buffer) {
error("EXPORTING: cannot create buffer for graphite exporting connector instance %s", instance->config.name);
return 1;
}
uv_mutex_init(&instance->mutex);
uv_cond_init(&instance->cond_var);
return 0;
}
/**
* Format dimension using collected data for Graphite connector
*
* @param instance an instance data structure.
* @param rd a dimension.
* @return Always returns 0.
*/
int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd)
{
struct engine *engine = instance->connector->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
char chart_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
chart_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
RRD_ID_LENGTH_MAX);
char dimension_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
dimension_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
RRD_ID_LENGTH_MAX);
buffer_sprintf(
instance->buffer,
"%s.%s.%s.%s%s%s " COLLECTED_NUMBER_FORMAT " %llu\n",
engine->config.prefix,
engine->config.hostname,
chart_name,
dimension_name,
(host->tags) ? ";" : "",
(host->tags) ? host->tags : "",
rd->last_collected_value,
(unsigned long long)rd->last_collected_time.tv_sec);
return 0;
}
/**
* Format dimension using a calculated value from stored data for Graphite connector
*
* @param instance an instance data structure.
* @param rd a dimension.
* @return Always returns 0.
*/
int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd)
{
struct engine *engine = instance->connector->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
char chart_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
chart_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
RRD_ID_LENGTH_MAX);
char dimension_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
dimension_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
RRD_ID_LENGTH_MAX);
time_t last_t;
calculated_number value = exporting_calculate_value_from_stored_data(instance, rd, &last_t);
if(isnan(value))
return 0;
buffer_sprintf(
instance->buffer,
"%s.%s.%s.%s%s%s " CALCULATED_NUMBER_FORMAT " %llu\n",
engine->config.prefix,
engine->config.hostname,
chart_name,
dimension_name,
(host->tags) ? ";" : "",
(host->tags) ? host->tags : "",
value,
(unsigned long long)last_t);
return 0;
}

View File

@@ -0,0 +1,13 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_EXPORTING_GRAPHITE_H
#define NETDATA_EXPORTING_GRAPHITE_H
#include "exporting/exporting_engine.h"
int init_graphite_connector(struct connector *connector);
int init_graphite_instance(struct instance *instance);
int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd);
int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd);
#endif //NETDATA_EXPORTING_GRAPHITE_H

View File

@@ -0,0 +1,72 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "exporting_engine.h"
#include "graphite/graphite.h"
#include "json/json.h"
#include "opentsdb/opentsdb.h"
/**
* Initialize connectors
*
* @param engine an engine data structure.
* @return Returns 0 on success, 1 on failure.
*/
int init_connectors(struct engine *engine)
{
engine->now = now_realtime_sec();
for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
switch (connector->config.type) {
case BACKEND_TYPE_GRAPHITE:
if (init_graphite_connector(connector) != 0)
return 1;
break;
case BACKEND_TYPE_JSON:
if (init_json_connector(connector) != 0)
return 1;
break;
case BACKEND_TYPE_OPENTSDB_USING_TELNET:
if (init_opentsdb_connector(connector) != 0)
return 1;
break;
case BACKEND_TYPE_OPENTSDB_USING_HTTP:
if (init_opentsdb_connector(connector) != 0)
return 1;
break;
default:
error("EXPORTING: unknown exporting connector type");
return 1;
}
for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
instance->index = engine->instance_num++;
instance->after = engine->now;
switch (connector->config.type) {
case BACKEND_TYPE_GRAPHITE:
if (init_graphite_instance(instance) != 0)
return 1;
break;
case BACKEND_TYPE_JSON:
if (init_json_instance(instance) != 0)
return 1;
break;
case BACKEND_TYPE_OPENTSDB_USING_TELNET:
if (init_opentsdb_telnet_instance(instance) != 0)
return 1;
break;
case BACKEND_TYPE_OPENTSDB_USING_HTTP:
if (init_opentsdb_http_instance(instance) != 0)
return 1;
break;
default:
error("EXPORTING: unknown exporting connector type");
return 1;
}
// dispatch the instance worker thread
uv_thread_create(&instance->thread, connector->worker, instance);
}
}
return 0;
}

View File

@@ -0,0 +1,4 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in

194
exporting/json/json.c Normal file
View File

@@ -0,0 +1,194 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "json.h"
/**
* Initialize JSON connector
*
* @param instance a connector data structure.
* @return Always returns 0.
*/
int init_json_connector(struct connector *connector)
{
connector->worker = simple_connector_worker;
struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config));
connector->config.connector_specific_config = (void *)connector_specific_config;
connector_specific_config->default_port = 5448;
return 0;
}
/**
* Initialize JSON connector instance
*
* @param instance an instance data structure.
* @return Returns 0 on success, 1 on failure.
*/
int init_json_instance(struct instance *instance)
{
instance->start_batch_formatting = NULL;
instance->start_host_formatting = NULL;
instance->start_chart_formatting = NULL;
if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
instance->metric_formatting = format_dimension_collected_json_plaintext;
else
instance->metric_formatting = format_dimension_stored_json_plaintext;
instance->end_chart_formatting = NULL;
instance->end_host_formatting = NULL;
instance->end_batch_formatting = NULL;
instance->buffer = (void *)buffer_create(0);
if (!instance->buffer) {
error("EXPORTING: cannot create buffer for json exporting connector instance %s", instance->config.name);
return 1;
}
uv_mutex_init(&instance->mutex);
uv_cond_init(&instance->cond_var);
return 0;
}
/**
* Format dimension using collected data for JSON connector
*
* @param instance an instance data structure.
* @param rd a dimension.
* @return Always returns 0.
*/
int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM *rd)
{
struct engine *engine = instance->connector->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
const char *tags_pre = "", *tags_post = "", *tags = host->tags;
if (!tags)
tags = "";
if (*tags) {
if (*tags == '{' || *tags == '[' || *tags == '"') {
tags_pre = "\"host_tags\":";
tags_post = ",";
} else {
tags_pre = "\"host_tags\":\"";
tags_post = "\",";
}
}
buffer_sprintf(
instance->buffer,
"{"
"\"prefix\":\"%s\","
"\"hostname\":\"%s\","
"%s%s%s"
"\"chart_id\":\"%s\","
"\"chart_name\":\"%s\","
"\"chart_family\":\"%s\","
"\"chart_context\": \"%s\","
"\"chart_type\":\"%s\","
"\"units\": \"%s\","
"\"id\":\"%s\","
"\"name\":\"%s\","
"\"value\":" COLLECTED_NUMBER_FORMAT ","
"\"timestamp\": %llu}\n",
engine->config.prefix,
engine->config.hostname,
tags_pre,
tags,
tags_post,
st->id,
st->name,
st->family,
st->context,
st->type,
st->units,
rd->id,
rd->name,
rd->last_collected_value,
(unsigned long long)rd->last_collected_time.tv_sec);
return 0;
}
/**
* Format dimension using a calculated value from stored data for JSON connector
*
* @param instance an instance data structure.
* @param rd a dimension.
* @return Always returns 0.
*/
int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd)
{
struct engine *engine = instance->connector->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
time_t last_t;
calculated_number value = exporting_calculate_value_from_stored_data(instance, rd, &last_t);
if(isnan(value))
return 0;
const char *tags_pre = "", *tags_post = "", *tags = host->tags;
if (!tags)
tags = "";
if (*tags) {
if (*tags == '{' || *tags == '[' || *tags == '"') {
tags_pre = "\"host_tags\":";
tags_post = ",";
} else {
tags_pre = "\"host_tags\":\"";
tags_post = "\",";
}
}
buffer_sprintf(
instance->buffer,
"{"
"\"prefix\":\"%s\","
"\"hostname\":\"%s\","
"%s%s%s"
"\"chart_id\":\"%s\","
"\"chart_name\":\"%s\","
"\"chart_family\":\"%s\","
"\"chart_context\": \"%s\","
"\"chart_type\":\"%s\","
"\"units\": \"%s\","
"\"id\":\"%s\","
"\"name\":\"%s\","
"\"value\":" CALCULATED_NUMBER_FORMAT ","
"\"timestamp\": %llu}\n",
engine->config.prefix,
engine->config.hostname,
tags_pre,
tags,
tags_post,
st->id,
st->name,
st->family,
st->context,
st->type,
st->units,
rd->id,
rd->name,
value,
(unsigned long long)last_t);
return 0;
}

13
exporting/json/json.h Normal file
View File

@@ -0,0 +1,13 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_EXPORTING_JSON_H
#define NETDATA_EXPORTING_JSON_H
#include "exporting/exporting_engine.h"
int init_json_connector(struct connector *connector);
int init_json_instance(struct instance *instance);
int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM *rd);
int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd);
#endif //NETDATA_EXPORTING_JSON_H

View File

@@ -0,0 +1,4 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in

View File

@@ -0,0 +1,305 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "opentsdb.h"
/**
* Initialize OpenTSDB connector
*
* @param instance a connector data structure.
* @return Always returns 0.
*/
int init_opentsdb_connector(struct connector *connector)
{
connector->worker = simple_connector_worker;
struct simple_connector_config *connector_specific_config = mallocz(sizeof(struct simple_connector_config));
connector->config.connector_specific_config = (void *)connector_specific_config;
connector_specific_config->default_port = 4242;
return 0;
}
/**
* Initialize OpenTSDB telnet connector instance
*
* @param instance an instance data structure.
* @return Returns 0 on success, 1 on failure.
*/
int init_opentsdb_telnet_instance(struct instance *instance)
{
instance->start_batch_formatting = NULL;
instance->start_host_formatting = NULL;
instance->start_chart_formatting = NULL;
if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
instance->metric_formatting = format_dimension_collected_opentsdb_telnet;
else
instance->metric_formatting = format_dimension_stored_opentsdb_telnet;
instance->end_chart_formatting = NULL;
instance->end_host_formatting = NULL;
instance->end_batch_formatting = NULL;
instance->buffer = (void *)buffer_create(0);
if (!instance->buffer) {
error("EXPORTING: cannot create buffer for opentsdb telnet exporting connector instance %s", instance->config.name);
return 1;
}
uv_mutex_init(&instance->mutex);
uv_cond_init(&instance->cond_var);
return 0;
}
/**
* Initialize OpenTSDB HTTP connector instance
*
* @param instance an instance data structure.
* @return Returns 0 on success, 1 on failure.
*/
int init_opentsdb_http_instance(struct instance *instance)
{
instance->start_batch_formatting = NULL;
instance->start_host_formatting = NULL;
instance->start_chart_formatting = NULL;
if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
instance->metric_formatting = format_dimension_collected_opentsdb_http;
else
instance->metric_formatting = format_dimension_stored_opentsdb_http;
instance->end_chart_formatting = NULL;
instance->end_host_formatting = NULL;
instance->end_batch_formatting = NULL;
instance->buffer = (void *)buffer_create(0);
if (!instance->buffer) {
error("EXPORTING: cannot create buffer for opentsdb HTTP exporting connector instance %s", instance->config.name);
return 1;
}
uv_mutex_init(&instance->mutex);
uv_cond_init(&instance->cond_var);
return 0;
}
/**
* Format dimension using collected data for OpenTSDB telnet connector
*
* @param instance an instance data structure.
* @param rd a dimension.
* @return Always returns 0.
*/
int format_dimension_collected_opentsdb_telnet(struct instance *instance, RRDDIM *rd)
{
struct engine *engine = instance->connector->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
char chart_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
chart_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
RRD_ID_LENGTH_MAX);
char dimension_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
dimension_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
RRD_ID_LENGTH_MAX);
buffer_sprintf(
instance->buffer,
"put %s.%s.%s %llu " COLLECTED_NUMBER_FORMAT " host=%s%s%s\n",
engine->config.prefix,
chart_name,
dimension_name,
(unsigned long long)rd->last_collected_time.tv_sec,
rd->last_collected_value,
engine->config.hostname,
(host->tags) ? " " : "",
(host->tags) ? host->tags : "");
return 0;
}
/**
* Format dimension using a calculated value from stored data for OpenTSDB telnet connector
*
* @param instance an instance data structure.
* @param rd a dimension.
* @return Always returns 0.
*/
int format_dimension_stored_opentsdb_telnet(struct instance *instance, RRDDIM *rd)
{
struct engine *engine = instance->connector->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
char chart_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
chart_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
RRD_ID_LENGTH_MAX);
char dimension_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
dimension_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
RRD_ID_LENGTH_MAX);
time_t last_t;
calculated_number value = exporting_calculate_value_from_stored_data(instance, rd, &last_t);
if(isnan(value))
return 0;
buffer_sprintf(
instance->buffer,
"put %s.%s.%s %llu " CALCULATED_NUMBER_FORMAT " host=%s%s%s\n",
engine->config.prefix,
chart_name,
dimension_name,
(unsigned long long)last_t,
value,
engine->config.hostname,
(host->tags) ? " " : "",
(host->tags) ? host->tags : "");
return 0;
}
/**
* Prepare an HTTP message for OpenTSDB HTTP connector
*
* @param buffer a buffer to write the message to.
* @param message the body of the message.
* @param hostname the name of the host that sends the message.
* @param length the length of the message body.
*/
static inline void opentsdb_build_message(BUFFER *buffer, char *message, const char *hostname, int length)
{
buffer_sprintf(
buffer,
"POST /api/put HTTP/1.1\r\n"
"Host: %s\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %d\r\n"
"\r\n"
"%s",
hostname,
length,
message);
}
/**
* Format dimension using collected data for OpenTSDB HTTP connector
*
* @param instance an instance data structure.
* @param rd a dimension.
* @return Always returns 0.
*/
int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM *rd)
{
struct engine *engine = instance->connector->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
char chart_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
chart_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
RRD_ID_LENGTH_MAX);
char dimension_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
dimension_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
RRD_ID_LENGTH_MAX);
char message[1024];
int length = snprintfz(
message,
sizeof(message),
"{"
" \"metric\": \"%s.%s.%s\","
" \"timestamp\": %llu,"
" \"value\": " COLLECTED_NUMBER_FORMAT ","
" \"tags\": {"
" \"host\": \"%s%s%s\""
" }"
"}",
engine->config.prefix,
chart_name,
dimension_name,
(unsigned long long)rd->last_collected_time.tv_sec,
rd->last_collected_value,
engine->config.hostname,
(host->tags) ? " " : "",
(host->tags) ? host->tags : "");
if (length > 0) {
opentsdb_build_message(instance->buffer, message, engine->config.hostname, length);
}
return 0;
}
/**
* Format dimension using a calculated value from stored data for OpenTSDB HTTP connector
*
* @param instance an instance data structure.
* @param rd a dimension.
* @return Always returns 0.
*/
int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd)
{
struct engine *engine = instance->connector->engine;
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
char chart_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
chart_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
RRD_ID_LENGTH_MAX);
char dimension_name[RRD_ID_LENGTH_MAX + 1];
exporting_name_copy(
dimension_name,
(instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
RRD_ID_LENGTH_MAX);
time_t last_t;
calculated_number value = exporting_calculate_value_from_stored_data(instance, rd, &last_t);
if(isnan(value))
return 0;
char message[1024];
int length = snprintfz(
message,
sizeof(message),
"{"
" \"metric\": \"%s.%s.%s\","
" \"timestamp\": %llu,"
" \"value\": " CALCULATED_NUMBER_FORMAT ","
" \"tags\": {"
" \"host\": \"%s%s%s\""
" }"
"}",
engine->config.prefix,
chart_name,
dimension_name,
(unsigned long long)last_t,
value,
engine->config.hostname,
(host->tags) ? " " : "",
(host->tags) ? host->tags : "");
if (length > 0) {
opentsdb_build_message(instance->buffer, message, engine->config.hostname, length);
}
return 0;
}

View File

@@ -0,0 +1,18 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_EXPORTING_OPENTSDB_H
#define NETDATA_EXPORTING_OPENTSDB_H
#include "exporting/exporting_engine.h"
int init_opentsdb_connector(struct connector *connector);
int init_opentsdb_telnet_instance(struct instance *instance);
int init_opentsdb_http_instance(struct instance *instance);
int format_dimension_collected_opentsdb_telnet(struct instance *instance, RRDDIM *rd);
int format_dimension_stored_opentsdb_telnet(struct instance *instance, RRDDIM *rd);
int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM *rd);
int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd);
#endif //NETDATA_EXPORTING_OPENTSDB_H

404
exporting/process_data.c Normal file
View File

@@ -0,0 +1,404 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "exporting_engine.h"
/**
* Normalize chart and dimension names
*
* Substitute '_' for any special character except '.'.
*
* @param dst where to copy name to.
* @param src where to copy name from.
* @param max_len the maximum size of copied name.
* @return Returns the size of the copied name.
*/
size_t exporting_name_copy(char *dst, const char *src, size_t max_len)
{
size_t n;
for (n = 0; *src && n < max_len; dst++, src++, n++) {
char c = *src;
if (c != '.' && !isalnum(c))
*dst = '_';
else
*dst = c;
}
*dst = '\0';
return n;
}
/**
* Mark scheduled instances
*
* Any instance can have its own update interval. On every exporting engine update only those instances are picked,
* which are scheduled for the update.
*
* @param engine an engine data structure.
* @return Returns 1 if there are instances to process
*/
int mark_scheduled_instances(struct engine *engine)
{
int instances_were_scheduled = 0;
for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
if (engine->now % instance->config.update_every < localhost->rrd_update_every) {
instance->scheduled = 1;
instances_were_scheduled = 1;
instance->before = engine->now;
}
}
}
return instances_were_scheduled;
}
/**
* Calculate the SUM or AVERAGE of a dimension, for any timeframe
*
* May return NAN if the database does not have any value in the give timeframe.
*
* @param instance an instance data structure.
* @param rd a dimension(metric) in the Netdata database.
* @param last_timestamp the timestamp that should be reported to the exporting connector instance.
* @return Returns the value, calculated over the given period.
*/
calculated_number exporting_calculate_value_from_stored_data(
struct instance *instance,
RRDDIM *rd,
time_t *last_timestamp)
{
RRDSET *st = rd->rrdset;
RRDHOST *host = st->rrdhost;
time_t after = instance->after;
time_t before = instance->before;
// find the edges of the rrd database for this chart
time_t first_t = rd->state->query_ops.oldest_time(rd);
time_t last_t = rd->state->query_ops.latest_time(rd);
time_t update_every = st->update_every;
struct rrddim_query_handle handle;
storage_number n;
// step back a little, to make sure we have complete data collection
// for all metrics
after -= update_every * 2;
before -= update_every * 2;
// align the time-frame
after = after - (after % update_every);
before = before - (before % update_every);
// for before, loose another iteration
// the latest point will be reported the next time
before -= update_every;
if (unlikely(after > before))
// this can happen when update_every > before - after
after = before;
if (unlikely(after < first_t))
after = first_t;
if (unlikely(before > last_t))
before = last_t;
if (unlikely(before < first_t || after > last_t)) {
// the chart has not been updated in the wanted timeframe
debug(
D_BACKEND,
"EXPORTING: %s.%s.%s: aligned timeframe %lu to %lu is outside the chart's database range %lu to %lu",
host->hostname,
st->id,
rd->id,
(unsigned long)after,
(unsigned long)before,
(unsigned long)first_t,
(unsigned long)last_t);
return NAN;
}
*last_timestamp = before;
size_t counter = 0;
calculated_number sum = 0;
for (rd->state->query_ops.init(rd, &handle, after, before); !rd->state->query_ops.is_finished(&handle);) {
time_t curr_t;
n = rd->state->query_ops.next_metric(&handle, &curr_t);
if (unlikely(!does_storage_number_exist(n))) {
// not collected
continue;
}
calculated_number value = unpack_storage_number(n);
sum += value;
counter++;
}
rd->state->query_ops.finalize(&handle);
if (unlikely(!counter)) {
debug(
D_BACKEND,
"EXPORTING: %s.%s.%s: no values stored in database for range %lu to %lu",
host->hostname,
st->id,
rd->id,
(unsigned long)after,
(unsigned long)before);
return NAN;
}
if (unlikely(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM))
return sum;
return sum / (calculated_number)counter;
}
/**
* Start batch formatting for every connector instance's buffer
*
* @param engine an engine data structure.
* @return Returns 0 on success, 1 on failure.
*/
int start_batch_formatting(struct engine *engine)
{
for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
if (instance->scheduled) {
uv_mutex_lock(&instance->mutex);
if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) {
error("EXPORTING: cannot start batch formatting for %s", instance->config.name);
return 1;
}
}
}
}
return 0;
}
/**
* Start host formatting for every connector instance's buffer
*
* @param engine an engine data structure.
* @param host a data collecting host.
* @return Returns 0 on success, 1 on failure.
*/
int start_host_formatting(struct engine *engine, RRDHOST *host)
{
for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
if (instance->scheduled) {
if (rrdhost_is_exportable(instance, host)) {
if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) {
error("EXPORTING: cannot start host formatting for %s", instance->config.name);
return 1;
}
} else {
instance->skip_host = 1;
}
}
}
}
return 0;
}
/**
* Start chart formatting for every connector instance's buffer
*
* @param engine an engine data structure.
* @param a chart.
* @return Returns 0 on success, 1 on failure.
*/
int start_chart_formatting(struct engine *engine, RRDSET *st)
{
for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
if (instance->scheduled && !instance->skip_host) {
if (rrdset_is_exportable(instance, st)) {
if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) {
error("EXPORTING: cannot start chart formatting for %s", instance->config.name);
return 1;
}
} else {
instance->skip_chart = 1;
}
}
}
}
return 0;
}
/**
* Format metric for every connector instance's buffer
*
* @param engine an engine data structure.
* @param rd a dimension(metric) in the Netdata database.
* @return Returns 0 on success, 1 on failure.
*/
int metric_formatting(struct engine *engine, RRDDIM *rd)
{
for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) {
error("EXPORTING: cannot format metric for %s", instance->config.name);
return 1;
}
instance->stats.chart_buffered_metrics++;
}
}
}
return 0;
}
/**
* End chart formatting for every connector instance's buffer
*
* @param engine an engine data structure.
* @param a chart.
* @return Returns 0 on success, 1 on failure.
*/
int end_chart_formatting(struct engine *engine, RRDSET *st)
{
for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) {
error("EXPORTING: cannot end chart formatting for %s", instance->config.name);
return 1;
}
}
instance->skip_chart = 0;
}
}
return 0;
}
/**
* End host formatting for every connector instance's buffer
*
* @param engine an engine data structure.
* @param host a data collecting host.
* @return Returns 0 on success, 1 on failure.
*/
int end_host_formatting(struct engine *engine, RRDHOST *host)
{
for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
if (instance->scheduled && !instance->skip_host) {
if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) {
error("EXPORTING: cannot end host formatting for %s", instance->config.name);
return 1;
}
}
instance->skip_host = 0;
}
}
return 0;
}
/**
* End batch formatting for every connector instance's buffer
*
* @param engine an engine data structure.
* @return Returns 0 on success, 1 on failure.
*/
int end_batch_formatting(struct engine *engine)
{
for (struct connector *connector = engine->connector_root; connector; connector = connector->next) {
for (struct instance *instance = connector->instance_root; instance; instance = instance->next) {
if (instance->scheduled) {
if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) {
error("EXPORTING: cannot end batch formatting for %s", instance->config.name);
return 1;
}
uv_mutex_unlock(&instance->mutex);
uv_cond_signal(&instance->cond_var);
instance->scheduled = 0;
instance->after = instance->before;
}
}
}
return 0;
}
/**
* Prepare buffers
*
* Walk through the Netdata database and fill buffers for every scheduled exporting connector instance according to
* configured rules.
*
* @param engine an engine data structure.
* @return Returns 0 on success, 1 on failure.
*/
int prepare_buffers(struct engine *engine)
{
netdata_thread_disable_cancelability();
rrd_rdlock();
if (start_batch_formatting(engine) != 0)
return 1;
RRDHOST *host;
rrdhost_foreach_read(host)
{
rrdhost_rdlock(host);
if (start_host_formatting(engine, host) != 0)
return 1;
RRDSET *st;
rrdset_foreach_read(st, host)
{
rrdset_rdlock(st);
if (start_chart_formatting(engine, st) != 0)
return 1;
RRDDIM *rd;
rrddim_foreach_read(rd, st)
{
if (metric_formatting(engine, rd) != 0)
return 1;
}
if (end_chart_formatting(engine, st) != 0)
return 1;
rrdset_unlock(st);
}
if (end_host_formatting(engine, host) != 0)
return 1;
rrdhost_unlock(host);
}
if (end_batch_formatting(engine) != 0)
return 1;
rrd_unlock();
netdata_thread_enable_cancelability();
return 0;
}
/**
* Notify workers
*
* Notify exporting connector instance working threads that data is ready to send.
*
* @param engine an engine data structure.
* @return Returns 0 on success, 1 on failure.
*/
int notify_workers(struct engine *engine)
{
(void)engine;
return 0;
}

354
exporting/read_config.c Normal file
View File

@@ -0,0 +1,354 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "exporting_engine.h"
struct config exporting_config = {.sections = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = {.avl_tree = {.root = NULL, .compar = appconfig_section_compare},
.rwlock = AVL_LOCK_INITIALIZER}};
static _CONNECTOR_INSTANCE *find_instance(const char *section)
{
_CONNECTOR_INSTANCE *local_ci;
local_ci = add_connector_instance(NULL, NULL); // Get root section
if (unlikely(!local_ci))
return local_ci;
if (!section)
return local_ci;
while (local_ci) {
if (!strcmp(local_ci->instance_name, section))
break;
local_ci = local_ci->next;
}
return local_ci;
}
char *expconfig_get(struct config *root, const char *section, const char *name, const char *default_value)
{
_CONNECTOR_INSTANCE *local_ci;
if (!strcmp(section, CONFIG_SECTION_EXPORTING))
return appconfig_get(root, CONFIG_SECTION_EXPORTING, name, default_value);
local_ci = find_instance(section);
if (!local_ci)
return NULL; // TODO: Check if it is meaningful to return default_value
return appconfig_get(
root,
local_ci->instance_name,
name,
appconfig_get(
root, local_ci->connector_name, name, appconfig_get(root, CONFIG_SECTION_EXPORTING, name, default_value)));
}
int expconfig_get_boolean(struct config *root, const char *section, const char *name, int default_value)
{
_CONNECTOR_INSTANCE *local_ci;
if (!strcmp(section, CONFIG_SECTION_EXPORTING))
return appconfig_get_boolean(root, CONFIG_SECTION_EXPORTING, name, default_value);
local_ci = find_instance(section);
if (!local_ci)
return 0; // TODO: Check if it is meaningful to return default_value
return appconfig_get_boolean(
root,
local_ci->instance_name,
name,
appconfig_get_boolean(
root,
local_ci->connector_name,
name,
appconfig_get_boolean(root, CONFIG_SECTION_EXPORTING, name, default_value)));
}
long long expconfig_get_number(struct config *root, const char *section, const char *name, long long default_value)
{
_CONNECTOR_INSTANCE *local_ci;
if (!strcmp(section, CONFIG_SECTION_EXPORTING))
return appconfig_get_number(root, CONFIG_SECTION_EXPORTING, name, default_value);
local_ci = find_instance(section);
if (!local_ci)
return 0; // TODO: Check if it is meaningful to return default_value
return appconfig_get_number(
root,
local_ci->instance_name,
name,
appconfig_get_number(
root,
local_ci->connector_name,
name,
appconfig_get_number(root, CONFIG_SECTION_EXPORTING, name, default_value)));
}
/*
* Get the next connector instance that we need to activate
*
* @param @target_ci will be filled with instance name and connector name
*
* @return - 1 if more connectors to be fetched, 0 done
*
*/
int get_connector_instance(struct connector_instance *target_ci)
{
static _CONNECTOR_INSTANCE *local_ci = NULL;
_CONNECTOR_INSTANCE *global_connector_instance;
global_connector_instance = find_instance(NULL); // Fetch head of instances
if (unlikely(!global_connector_instance))
return 0;
if (target_ci == NULL) {
local_ci = NULL;
return 1;
}
if (local_ci == NULL)
local_ci = global_connector_instance;
else {
local_ci = local_ci->next;
if (local_ci == NULL)
return 0;
}
strcpy(target_ci->instance_name, local_ci->instance_name);
strcpy(target_ci->connector_name, local_ci->connector_name);
return 1;
}
/**
* Select Type
*
* Select the connector type based on the user input
*
* @param type is the string that defines the connector type
*
* @return It returns the connector id.
*/
BACKEND_TYPE exporting_select_type(const char *type)
{
if (!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
return BACKEND_TYPE_GRAPHITE;
} else if (!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
return BACKEND_TYPE_OPENTSDB_USING_TELNET;
} else if (!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) {
return BACKEND_TYPE_OPENTSDB_USING_HTTP;
} else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
return BACKEND_TYPE_JSON;
} else if (!strcmp(type, "prometheus_remote_write")) {
return BACKEND_TYPE_PROMETEUS;
} else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
return BACKEND_TYPE_KINESIS;
} else if (!strcmp(type, "mongodb") || !strcmp(type, "mongodb:plaintext"))
return BACKEND_TYPE_MONGODB;
return BACKEND_TYPE_UNKNOWN;
}
EXPORTING_OPTIONS exporting_parse_data_source(const char *data_source, EXPORTING_OPTIONS exporting_options) {
if(!strcmp(data_source, "raw") || !strcmp(data_source, "as collected") || !strcmp(data_source, "as-collected") || !strcmp(data_source, "as_collected") || !strcmp(data_source, "ascollected")) {
exporting_options |= EXPORTING_SOURCE_DATA_AS_COLLECTED;
exporting_options &= ~(EXPORTING_OPTIONS_SOURCE_BITS ^ EXPORTING_SOURCE_DATA_AS_COLLECTED);
}
else if(!strcmp(data_source, "average")) {
exporting_options |= EXPORTING_SOURCE_DATA_AVERAGE;
exporting_options &= ~(EXPORTING_OPTIONS_SOURCE_BITS ^ EXPORTING_SOURCE_DATA_AVERAGE);
}
else if(!strcmp(data_source, "sum") || !strcmp(data_source, "volume")) {
exporting_options |= EXPORTING_SOURCE_DATA_SUM;
exporting_options &= ~(EXPORTING_OPTIONS_SOURCE_BITS ^ EXPORTING_SOURCE_DATA_SUM);
}
else {
error("EXPORTING: invalid data data_source method '%s'.", data_source);
}
return exporting_options;
}
/**
* Read configuration
*
* Based on read configuration an engine data structure is filled with exporting connector instances.
*
* @return Returns a filled engine data structure or NULL if there are no connector instances configured.
*/
struct engine *read_exporting_config()
{
int instances_to_activate = 0;
int exporting_config_exists = 0;
static struct engine *engine = NULL;
struct connector_instance_list {
struct connector_instance local_ci;
struct connector_instance_list *next;
};
struct connector_instance local_ci;
struct connector_instance_list *tmp_ci_list, *tmp_ci_list1;
struct connector_instance_list **ci_list;
if (unlikely(engine))
return engine;
char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, EXPORTING_CONF);
exporting_config_exists = appconfig_load(&exporting_config, filename, 0);
if (!exporting_config_exists) {
info("CONFIG: cannot load user exporting config '%s'. Will try the stock version.", filename);
freez(filename);
filename = strdupz_path_subpath(netdata_configured_stock_config_dir, EXPORTING_CONF);
exporting_config_exists = appconfig_load(&exporting_config, filename, 0);
if (!exporting_config_exists)
info("CONFIG: cannot load stock exporting config '%s'. Running with internal defaults.", filename);
}
freez(filename);
// Will build a list of instances per connector
// TODO: change BACKEND to EXPORTING
ci_list = callocz(BACKEND_TYPE_NUM, sizeof(struct connector_instance_list *));
while (get_connector_instance(&local_ci)) {
BACKEND_TYPE backend_type;
info("Processing connector instance (%s)", local_ci.instance_name);
if (exporter_get_boolean(local_ci.instance_name, "enabled", 0)) {
backend_type = exporting_select_type(local_ci.connector_name);
info(
"Instance (%s) on connector (%s) is enabled and scheduled for activation",
local_ci.instance_name,
local_ci.connector_name);
tmp_ci_list = (struct connector_instance_list *)callocz(1, sizeof(struct connector_instance_list));
memcpy(&tmp_ci_list->local_ci, &local_ci, sizeof(local_ci));
tmp_ci_list->next = ci_list[backend_type];
ci_list[backend_type] = tmp_ci_list;
instances_to_activate++;
} else
info("Instance (%s) on connector (%s) is not enabled", local_ci.instance_name, local_ci.connector_name);
}
if (unlikely(!instances_to_activate)) {
info("No connector instances to activate");
freez(ci_list);
return NULL;
}
engine = (struct engine *)calloc(1, sizeof(struct engine));
// TODO: Check and fill engine fields if actually needed
if (exporting_config_exists) {
engine->config.hostname =
strdupz(exporter_get(CONFIG_SECTION_EXPORTING, "hostname", netdata_configured_hostname));
engine->config.prefix = strdupz(exporter_get(CONFIG_SECTION_EXPORTING, "prefix", "netdata"));
engine->config.update_every =
exporter_get_number(CONFIG_SECTION_EXPORTING, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT);
}
for (size_t i = 0; i < BACKEND_TYPE_NUM; i++) {
// For each connector build list
tmp_ci_list = ci_list[i];
// If we have a list of instances for this connector then build it
if (tmp_ci_list) {
struct connector *tmp_connector;
tmp_connector = (struct connector *)calloc(1, sizeof(struct connector));
tmp_connector->next = engine->connector_root;
engine->connector_root = tmp_connector;
tmp_connector->config.type = i;
tmp_connector->engine = engine;
while (tmp_ci_list) {
struct instance *tmp_instance;
char *instance_name;
info("Instance %s on %s", tmp_ci_list->local_ci.instance_name, tmp_ci_list->local_ci.connector_name);
tmp_instance = (struct instance *)calloc(1, sizeof(struct instance));
tmp_instance->connector = engine->connector_root;
tmp_instance->next = engine->connector_root->instance_root;
engine->connector_root->instance_root = tmp_instance;
tmp_instance->connector = engine->connector_root;
instance_name = tmp_ci_list->local_ci.instance_name;
tmp_instance->config.name = strdupz(tmp_ci_list->local_ci.instance_name);
tmp_instance->config.destination =
strdupz(exporter_get(instance_name, EXPORTER_DESTINATION, EXPORTER_DESTINATION_DEFAULT));
tmp_instance->config.update_every =
exporter_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT);
tmp_instance->config.buffer_on_failures =
exporter_get_number(instance_name, EXPORTER_BUF_ONFAIL, EXPORTER_BUF_ONFAIL_DEFAULT);
tmp_instance->config.timeoutms =
exporter_get_number(instance_name, EXPORTER_TIMEOUT_MS, EXPORTER_TIMEOUT_MS_DEFAULT);
tmp_instance->config.charts_pattern = simple_pattern_create(
exporter_get(instance_name, EXPORTER_SEND_CHART_MATCH, EXPORTER_SEND_CHART_MATCH_DEFAULT),
NULL,
SIMPLE_PATTERN_EXACT);
tmp_instance->config.hosts_pattern = simple_pattern_create(
exporter_get(instance_name, EXPORTER_SEND_HOST_MATCH, EXPORTER_SEND_HOST_MATCH_DEFAULT),
NULL,
SIMPLE_PATTERN_EXACT);
char *data_source =
exporter_get(instance_name, EXPORTER_DATA_SOURCE, EXPORTER_DATA_SOURCE_DEFAULT);
tmp_instance->config.options = exporting_parse_data_source(data_source, tmp_instance->config.options);
if (exporter_get_boolean(instance_name, EXPORTER_SEND_NAMES, EXPORTER_SEND_NAMES_DEFAULT))
tmp_instance->config.options |= EXPORTING_OPTION_SEND_NAMES;
else
tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_NAMES;
#ifdef NETDATA_INTERNAL_CHECKS
info(
" Dest=[%s], upd=[%d], buffer=[%d] timeout=[%ld] options=[%u]",
tmp_instance->config.destination,
tmp_instance->config.update_every,
tmp_instance->config.buffer_on_failures,
tmp_instance->config.timeoutms,
tmp_instance->config.options);
#endif
if (unlikely(!exporting_config_exists) && !engine->config.hostname) {
engine->config.hostname =
strdupz(config_get(instance_name, "hostname", netdata_configured_hostname));
engine->config.prefix = strdupz(config_get(instance_name, "prefix", "netdata"));
engine->config.update_every =
config_get_number(instance_name, EXPORTER_UPDATE_EVERY, EXPORTER_UPDATE_EVERY_DEFAULT);
}
tmp_ci_list1 = tmp_ci_list->next;
freez(tmp_ci_list);
tmp_ci_list = tmp_ci_list1;
}
}
}
freez(ci_list);
return engine;
}

203
exporting/send_data.c Normal file
View File

@@ -0,0 +1,203 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "exporting_engine.h"
/**
* Discard response
*
* Discards a response received by an exporting connector instance after logging a sample of it to error.log
*
* @param buffer buffer with response data.
* @param instance an instance data structure.
* @return Always returns 0.
*/
int exporting_discard_response(BUFFER *buffer, struct instance *instance) {
char sample[1024];
const char *s = buffer_tostring(buffer);
char *d = sample, *e = &sample[sizeof(sample) - 1];
for(; *s && d < e ;s++) {
char c = *s;
if(unlikely(!isprint(c))) c = ' ';
*d++ = c;
}
*d = '\0';
info(
"EXPORTING: received %zu bytes from %s connector instance. Ignoring them. Sample: '%s'",
buffer_strlen(buffer),
instance->config.name,
sample);
buffer_flush(buffer);
return 0;
}
/**
* Receive response
*
* @param sock communication socket.
* @param instance an instance data structure.
*/
void simple_connector_receive_response(int *sock, struct instance *instance)
{
static BUFFER *response = NULL;
if (!response)
response = buffer_create(1);
struct stats *stats = &instance->stats;
errno = 0;
// loop through to collect all data
while (*sock != -1 && errno != EWOULDBLOCK) {
buffer_need_bytes(response, 4096);
ssize_t r;
r = recv(*sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
if (likely(r > 0)) {
// we received some data
response->len += r;
stats->chart_received_bytes += r;
stats->chart_receptions++;
} else if (r == 0) {
error("EXPORTING: '%s' closed the socket", instance->config.destination);
close(*sock);
*sock = -1;
} else {
// failed to receive data
if (errno != EAGAIN && errno != EWOULDBLOCK) {
error("EXPORTING: cannot receive data from '%s'.", instance->config.destination);
}
}
#ifdef UNIT_TESTING
break;
#endif
}
// if we received data, process them
if (buffer_strlen(response))
exporting_discard_response(response, instance);
}
/**
* Send buffer to a server
*
* @param sock communication socket.
* @param failures the number of communication failures.
* @param instance an instance data structure.
*/
void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance)
{
BUFFER *buffer = (BUFFER *)instance->buffer;
size_t len = buffer_strlen(buffer);
int flags = 0;
#ifdef MSG_NOSIGNAL
flags += MSG_NOSIGNAL;
#endif
struct stats *stats = &instance->stats;
ssize_t written;
written = send(*sock, buffer_tostring(buffer), len, flags);
if(written != -1 && (size_t)written == len) {
// we sent the data successfully
stats->chart_transmission_successes++;
stats->chart_sent_bytes += written;
stats->chart_sent_metrics = stats->chart_buffered_metrics;
// reset the failures count
*failures = 0;
// empty the buffer
buffer_flush(buffer);
}
else {
// oops! we couldn't send (all or some of the) data
error(
"EXPORTING: failed to write data to '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.",
instance->config.destination,
len,
written);
stats->chart_transmission_failures++;
if(written != -1)
stats->chart_sent_bytes += written;
// increment the counter we check for data loss
(*failures)++;
// close the socket - we will re-open it next time
close(*sock);
*sock = -1;
}
}
/**
* Simple connector worker
*
* Runs in a separate thread for every instance.
*
* @param instance_p an instance data structure.
*/
void simple_connector_worker(void *instance_p)
{
struct instance *instance = (struct instance*)instance_p;
struct simple_connector_config *connector_specific_config = instance->connector->config.connector_specific_config;
struct stats *stats = &instance->stats;
int sock = -1;
struct timeval timeout = {.tv_sec = (instance->config.timeoutms * 1000) / 1000000,
.tv_usec = (instance->config.timeoutms * 1000) % 1000000};
int failures = 0;
while(!netdata_exit) {
// ------------------------------------------------------------------------
// if we are connected, receive a response, without blocking
if(likely(sock != -1))
simple_connector_receive_response(&sock, instance);
// ------------------------------------------------------------------------
// if we are not connected, connect to a data collecting server
if(unlikely(sock == -1)) {
size_t reconnects = 0;
sock = connect_to_one_of(
instance->config.destination,
connector_specific_config->default_port,
&timeout,
&reconnects,
NULL,
0);
stats->chart_reconnects += reconnects;
}
if(unlikely(netdata_exit)) break;
// ------------------------------------------------------------------------
// if we are connected, send our buffer to the data collecting server
uv_mutex_lock(&instance->mutex);
uv_cond_wait(&instance->cond_var, &instance->mutex);
if (likely(sock != -1)) {
simple_connector_send_buffer(&sock, &failures, instance);
} else {
error("EXPORTING: failed to update '%s'", instance->config.destination);
stats->chart_transmission_failures++;
// increment the counter we check for data loss
failures++;
}
uv_mutex_unlock(&instance->mutex);
#ifdef UNIT_TESTING
break;
#endif
}
}

View File

@@ -0,0 +1,18 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "exporting_engine.h"
/**
* Send internal metrics
*
* Send performance metrics for the operation of exporting engine itself to the Netdata database.
*
* @param engine an engine data structure.
* @return Returns 0 on success, 1 on failure.
*/
int send_internal_metrics(struct engine *engine)
{
(void)engine;
return 0;
}

View File

@@ -0,0 +1,4 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in

View File

@@ -0,0 +1,162 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "test_exporting_engine.h"
struct engine *__real_read_exporting_config();
struct engine *__wrap_read_exporting_config()
{
function_called();
return mock_ptr_type(struct engine *);
}
struct engine *__mock_read_exporting_config()
{
struct engine *engine = calloc(1, sizeof(struct engine));
engine->config.prefix = strdupz("netdata");
engine->config.hostname = strdupz("test-host");
engine->config.update_every = 3;
engine->connector_root = calloc(1, sizeof(struct connector));
engine->connector_root->config.type = BACKEND_TYPE_GRAPHITE;
engine->connector_root->engine = engine;
engine->connector_root->instance_root = calloc(1, sizeof(struct instance));
struct instance *instance = engine->connector_root->instance_root;
instance->connector = engine->connector_root;
instance->config.name = strdupz("instance_name");
instance->config.destination = strdupz("localhost");
instance->config.update_every = 1;
instance->config.buffer_on_failures = 10;
instance->config.timeoutms = 10000;
instance->config.charts_pattern = simple_pattern_create("*", NULL, SIMPLE_PATTERN_EXACT);
instance->config.hosts_pattern = simple_pattern_create("*", NULL, SIMPLE_PATTERN_EXACT);
instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
return engine;
}
int __real_init_connectors(struct engine *engine);
int __wrap_init_connectors(struct engine *engine)
{
function_called();
check_expected_ptr(engine);
return mock_type(int);
}
int __real_mark_scheduled_instances(struct engine *engine);
int __wrap_mark_scheduled_instances(struct engine *engine)
{
function_called();
check_expected_ptr(engine);
return mock_type(int);
}
calculated_number __real_exporting_calculate_value_from_stored_data(
struct instance *instance,
RRDDIM *rd,
time_t *last_timestamp);
calculated_number __wrap_exporting_calculate_value_from_stored_data(
struct instance *instance,
RRDDIM *rd,
time_t *last_timestamp)
{
(void)instance;
(void)rd;
*last_timestamp = 15052;
function_called();
return mock_type(calculated_number);
}
int __real_prepare_buffers(struct engine *engine);
int __wrap_prepare_buffers(struct engine *engine)
{
function_called();
check_expected_ptr(engine);
return mock_type(int);
}
int __wrap_notify_workers(struct engine *engine)
{
function_called();
check_expected_ptr(engine);
return mock_type(int);
}
int __wrap_send_internal_metrics(struct engine *engine)
{
function_called();
check_expected_ptr(engine);
return mock_type(int);
}
int __wrap_rrdhost_is_exportable(struct instance *instance, RRDHOST *host)
{
function_called();
check_expected_ptr(instance);
check_expected_ptr(host);
return mock_type(int);
}
int __wrap_rrdset_is_exportable(struct instance *instance, RRDSET *st)
{
function_called();
check_expected_ptr(instance);
check_expected_ptr(st);
return mock_type(int);
}
int __mock_start_batch_formatting(struct instance *instance)
{
function_called();
check_expected_ptr(instance);
return mock_type(int);
}
int __mock_start_host_formatting(struct instance *instance, RRDHOST *host)
{
function_called();
check_expected_ptr(instance);
check_expected_ptr(host);
return mock_type(int);
}
int __mock_start_chart_formatting(struct instance *instance, RRDSET *st)
{
function_called();
check_expected_ptr(instance);
check_expected_ptr(st);
return mock_type(int);
}
int __mock_metric_formatting(struct instance *instance, RRDDIM *rd)
{
function_called();
check_expected_ptr(instance);
check_expected_ptr(rd);
return mock_type(int);
}
int __mock_end_chart_formatting(struct instance *instance, RRDSET *st)
{
function_called();
check_expected_ptr(instance);
check_expected_ptr(st);
return mock_type(int);
}
int __mock_end_host_formatting(struct instance *instance, RRDHOST *host)
{
function_called();
check_expected_ptr(instance);
check_expected_ptr(host);
return mock_type(int);
}
int __mock_end_batch_formatting(struct instance *instance)
{
function_called();
check_expected_ptr(instance);
return mock_type(int);
}

View File

@@ -0,0 +1,110 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "test_exporting_engine.h"
int setup_configured_engine(void **state)
{
struct engine *engine = __mock_read_exporting_config();
*state = engine;
return 0;
}
int teardown_configured_engine(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
free((void *)instance->config.destination);
free((void *)instance->config.name);
simple_pattern_free(instance->config.charts_pattern);
simple_pattern_free(instance->config.hosts_pattern);
free(instance);
free(engine->connector_root);
free((void *)engine->config.prefix);
free((void *)engine->config.hostname);
free(engine);
return 0;
}
int setup_rrdhost()
{
localhost = calloc(1, sizeof(RRDHOST));
localhost->rrd_update_every = 1;
localhost->tags = strdupz("TAG1=VALUE1 TAG2=VALUE2");
localhost->rrdset_root = calloc(1, sizeof(RRDSET));
RRDSET *st = localhost->rrdset_root;
st->rrdhost = localhost;
strcpy(st->id, "chart_id");
st->name = strdupz("chart_name");
st->flags |= RRDSET_FLAG_ENABLED;
st->rrd_memory_mode |= RRD_MEMORY_MODE_SAVE;
st->update_every = 1;
localhost->rrdset_root->dimensions = calloc(1, sizeof(RRDDIM));
RRDDIM *rd = localhost->rrdset_root->dimensions;
rd->rrdset = st;
rd->id = strdupz("dimension_id");
rd->name = strdupz("dimension_name");
rd->last_collected_value = 123000321;
rd->last_collected_time.tv_sec = 15051;
rd->next = NULL;
rd->state = calloc(1, sizeof(*rd->state));
rd->state->query_ops.oldest_time = __mock_rrddim_query_oldest_time;
rd->state->query_ops.latest_time = __mock_rrddim_query_latest_time;
rd->state->query_ops.init = __mock_rrddim_query_init;
rd->state->query_ops.is_finished = __mock_rrddim_query_is_finished;
rd->state->query_ops.next_metric = __mock_rrddim_query_next_metric;
rd->state->query_ops.finalize = __mock_rrddim_query_finalize;
return 0;
}
int teardown_rrdhost()
{
RRDDIM *rd = localhost->rrdset_root->dimensions;
free((void *)rd->name);
free((void *)rd->id);
free(rd->state);
free(rd);
RRDSET *st = localhost->rrdset_root;
free((void *)st->name);
free(st);
free((void *)localhost->tags);
free(localhost);
return 0;
}
int setup_initialized_engine(void **state)
{
setup_configured_engine(state);
struct engine *engine = *state;
init_connectors_in_tests(engine);
setup_rrdhost();
return 0;
}
int teardown_initialized_engine(void **state)
{
struct engine *engine = *state;
teardown_rrdhost();
buffer_free(engine->connector_root->instance_root->buffer);
teardown_configured_engine(state);
return 0;
}

View File

@@ -0,0 +1,134 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "test_exporting_engine.h"
// Use memomy allocation functions guarded by CMocka in strdupz
const char *__wrap_strdupz(const char *s)
{
char *duplicate = malloc(sizeof(char) * (strlen(s) + 1));
strcpy(duplicate, s);
return duplicate;
}
time_t __wrap_now_realtime_sec(void)
{
function_called();
return mock_type(time_t);
}
void __wrap_info_int(const char *file, const char *function, const unsigned long line, const char *fmt, ...)
{
(void)file;
(void)function;
(void)line;
function_called();
va_list args;
va_start(args, fmt);
vsnprintf(log_line, MAX_LOG_LINE, fmt, args);
va_end(args);
}
int __wrap_connect_to_one_of(
const char *destination,
int default_port,
struct timeval *timeout,
size_t *reconnects_counter,
char *connected_to,
size_t connected_to_size)
{
(void)timeout;
function_called();
check_expected(destination);
check_expected_ptr(default_port);
// TODO: check_expected_ptr(timeout);
check_expected(reconnects_counter);
check_expected(connected_to);
check_expected(connected_to_size);
return mock_type(int);
}
void __rrdhost_check_rdlock(RRDHOST *host, const char *file, const char *function, const unsigned long line)
{
(void)host;
(void)file;
(void)function;
(void)line;
}
void __rrdset_check_rdlock(RRDSET *st, const char *file, const char *function, const unsigned long line)
{
(void)st;
(void)file;
(void)function;
(void)line;
}
void __rrd_check_rdlock(const char *file, const char *function, const unsigned long line)
{
(void)file;
(void)function;
(void)line;
}
const char *rrd_memory_mode_name(RRD_MEMORY_MODE id)
{
(void)id;
return RRD_MEMORY_MODE_NONE_NAME;
}
time_t __mock_rrddim_query_oldest_time(RRDDIM *rd)
{
(void)rd;
function_called();
return mock_type(time_t);
}
time_t __mock_rrddim_query_latest_time(RRDDIM *rd)
{
(void)rd;
function_called();
return mock_type(time_t);
}
void __mock_rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time)
{
(void)rd;
(void)handle;
function_called();
check_expected(start_time);
check_expected(end_time);
}
int __mock_rrddim_query_is_finished(struct rrddim_query_handle *handle)
{
(void)handle;
function_called();
return mock_type(int);
}
storage_number __mock_rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time)
{
(void)handle;
(void)current_time;
function_called();
return mock_type(storage_number);
}
void __mock_rrddim_query_finalize(struct rrddim_query_handle *handle)
{
(void)handle;
function_called();
}

View File

@@ -0,0 +1,61 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "test_exporting_engine.h"
void __wrap_uv_thread_create(uv_thread_t thread, void (*worker)(void *arg), void *arg)
{
function_called();
check_expected_ptr(thread);
check_expected_ptr(worker);
check_expected_ptr(arg);
}
void __wrap_uv_mutex_lock(uv_mutex_t *mutex)
{
(void)mutex;
}
void __wrap_uv_mutex_unlock(uv_mutex_t *mutex)
{
(void)mutex;
}
void __wrap_uv_cond_signal(uv_cond_t *cond_var)
{
(void)cond_var;
}
void __wrap_uv_cond_wait(uv_cond_t *cond_var, uv_mutex_t *mutex)
{
(void)cond_var;
(void)mutex;
}
ssize_t __wrap_recv(int sockfd, void *buf, size_t len, int flags)
{
function_called();
check_expected(sockfd);
check_expected_ptr(buf);
check_expected(len);
check_expected(flags);
char *mock_string = "Test recv";
strcpy(buf, mock_string);
return strlen(mock_string);
}
ssize_t __wrap_send(int sockfd, const void *buf, size_t len, int flags)
{
function_called();
check_expected(sockfd);
check_expected_ptr(buf);
check_expected_ptr(buf);
check_expected(len);
check_expected(flags);
return strlen(buf);
}

View File

@@ -0,0 +1,738 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "test_exporting_engine.h"
#include "libnetdata/required_dummies.h"
RRDHOST *localhost;
netdata_rwlock_t rrd_rwlock;
// global variables needed by read_exporting_config()
struct config netdata_config;
char *netdata_configured_user_config_dir = ".";
char *netdata_configured_stock_config_dir = ".";
char *netdata_configured_hostname = "test_host";
char log_line[MAX_LOG_LINE + 1];
void init_connectors_in_tests(struct engine *engine)
{
expect_function_call(__wrap_now_realtime_sec);
will_return(__wrap_now_realtime_sec, 2);
expect_function_call(__wrap_uv_thread_create);
expect_value(__wrap_uv_thread_create, thread, &engine->connector_root->instance_root->thread);
expect_value(__wrap_uv_thread_create, worker, simple_connector_worker);
expect_value(__wrap_uv_thread_create, arg, engine->connector_root->instance_root);
assert_int_equal(__real_init_connectors(engine), 0);
assert_int_equal(engine->now, 2);
assert_int_equal(engine->connector_root->instance_root->after, 2);
}
static void test_exporting_engine(void **state)
{
struct engine *engine = *state;
expect_function_call(__wrap_read_exporting_config);
will_return(__wrap_read_exporting_config, engine);
expect_function_call(__wrap_init_connectors);
expect_memory(__wrap_init_connectors, engine, engine, sizeof(struct engine));
will_return(__wrap_init_connectors, 0);
expect_function_call(__wrap_now_realtime_sec);
will_return(__wrap_now_realtime_sec, 2);
expect_function_call(__wrap_mark_scheduled_instances);
expect_memory(__wrap_mark_scheduled_instances, engine, engine, sizeof(struct engine));
will_return(__wrap_mark_scheduled_instances, 1);
expect_function_call(__wrap_prepare_buffers);
expect_memory(__wrap_prepare_buffers, engine, engine, sizeof(struct engine));
will_return(__wrap_prepare_buffers, 0);
expect_function_call(__wrap_notify_workers);
expect_memory(__wrap_notify_workers, engine, engine, sizeof(struct engine));
will_return(__wrap_notify_workers, 0);
expect_function_call(__wrap_send_internal_metrics);
expect_memory(__wrap_send_internal_metrics, engine, engine, sizeof(struct engine));
will_return(__wrap_send_internal_metrics, 0);
void *ptr = malloc(sizeof(int));
assert_ptr_equal(exporting_main(ptr), NULL);
assert_int_equal(engine->now, 2);
free(ptr);
}
static void test_read_exporting_config(void **state)
{
struct engine *engine = __mock_read_exporting_config(); // TODO: use real read_exporting_config() function
*state = engine;
assert_ptr_not_equal(engine, NULL);
assert_string_equal(engine->config.prefix, "netdata");
assert_string_equal(engine->config.hostname, "test-host");
assert_int_equal(engine->config.update_every, 3);
assert_int_equal(engine->instance_num, 0);
struct connector *connector = engine->connector_root;
assert_ptr_not_equal(connector, NULL);
assert_ptr_equal(connector->next, NULL);
assert_ptr_equal(connector->engine, engine);
assert_int_equal(connector->config.type, BACKEND_TYPE_GRAPHITE);
struct instance *instance = connector->instance_root;
assert_ptr_not_equal(instance, NULL);
assert_ptr_equal(instance->next, NULL);
assert_ptr_equal(instance->connector, connector);
assert_string_equal(instance->config.destination, "localhost");
assert_int_equal(instance->config.update_every, 1);
assert_int_equal(instance->config.buffer_on_failures, 10);
assert_int_equal(instance->config.timeoutms, 10000);
assert_true(simple_pattern_matches(instance->config.charts_pattern, "any_chart"));
assert_true(simple_pattern_matches(instance->config.hosts_pattern, "anyt_host"));
assert_int_equal(instance->config.options, EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES);
teardown_configured_engine(state);
}
static void test_init_connectors(void **state)
{
struct engine *engine = *state;
init_connectors_in_tests(engine);
assert_int_equal(engine->instance_num, 1);
struct connector *connector = engine->connector_root;
assert_ptr_equal(connector->next, NULL);
assert_ptr_equal(connector->worker, simple_connector_worker);
struct simple_connector_config *connector_specific_config = connector->config.connector_specific_config;
assert_int_equal(connector_specific_config->default_port, 2003);
struct instance *instance = connector->instance_root;
assert_ptr_equal(instance->next, NULL);
assert_int_equal(instance->index, 0);
assert_ptr_equal(instance->start_batch_formatting, NULL);
assert_ptr_equal(instance->start_host_formatting, NULL);
assert_ptr_equal(instance->start_chart_formatting, NULL);
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_graphite_plaintext);
assert_ptr_equal(instance->end_chart_formatting, NULL);
assert_ptr_equal(instance->end_host_formatting, NULL);
assert_ptr_equal(instance->end_batch_formatting, NULL);
BUFFER *buffer = instance->buffer;
assert_ptr_not_equal(buffer, NULL);
buffer_sprintf(buffer, "%s", "graphite test");
assert_string_equal(buffer_tostring(buffer), "graphite test");
}
static void test_init_graphite_instance(void **state)
{
struct engine *engine = *state;
struct connector *connector = engine->connector_root;
struct instance *instance = connector->instance_root;
init_graphite_connector(connector);
assert_int_equal(
((struct simple_connector_config *)(connector->config.connector_specific_config))->default_port, 2003);
freez(connector->config.connector_specific_config);
instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_graphite_instance(instance), 0);
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_graphite_plaintext);
assert_ptr_not_equal(instance->buffer, NULL);
buffer_free(instance->buffer);
instance->config.options = EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_graphite_instance(instance), 0);
assert_ptr_equal(instance->metric_formatting, format_dimension_stored_graphite_plaintext);
}
static void test_init_json_instance(void **state)
{
struct engine *engine = *state;
struct connector *connector = engine->connector_root;
struct instance *instance = connector->instance_root;
init_json_connector(connector);
assert_int_equal(
((struct simple_connector_config *)(connector->config.connector_specific_config))->default_port, 5448);
freez(connector->config.connector_specific_config);
instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_json_instance(instance), 0);
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_json_plaintext);
assert_ptr_not_equal(instance->buffer, NULL);
buffer_free(instance->buffer);
instance->config.options = EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_json_instance(instance), 0);
assert_ptr_equal(instance->metric_formatting, format_dimension_stored_json_plaintext);
}
static void test_init_opentsdb_connector(void **state)
{
struct engine *engine = *state;
struct connector *connector = engine->connector_root;
init_opentsdb_connector(connector);
assert_int_equal(
((struct simple_connector_config *)(connector->config.connector_specific_config))->default_port, 4242);
freez(connector->config.connector_specific_config);
}
static void test_init_opentsdb_telnet_instance(void **state)
{
struct engine *engine = *state;
struct connector *connector = engine->connector_root;
struct instance *instance = connector->instance_root;
instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_opentsdb_telnet_instance(instance), 0);
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_opentsdb_telnet);
assert_ptr_not_equal(instance->buffer, NULL);
buffer_free(instance->buffer);
instance->config.options = EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_opentsdb_telnet_instance(instance), 0);
assert_ptr_equal(instance->metric_formatting, format_dimension_stored_opentsdb_telnet);
}
static void test_init_opentsdb_http_instance(void **state)
{
struct engine *engine = *state;
struct connector *connector = engine->connector_root;
struct instance *instance = connector->instance_root;
instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_opentsdb_http_instance(instance), 0);
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_opentsdb_http);
assert_ptr_not_equal(instance->buffer, NULL);
buffer_free(instance->buffer);
instance->config.options = EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_opentsdb_http_instance(instance), 0);
assert_ptr_equal(instance->metric_formatting, format_dimension_stored_opentsdb_http);
}
static void test_mark_scheduled_instances(void **state)
{
struct engine *engine = *state;
assert_int_equal(__real_mark_scheduled_instances(engine), 1);
struct instance *instance = engine->connector_root->instance_root;
assert_int_equal(instance->scheduled, 1);
assert_int_equal(instance->before, 2);
}
static void test_rrdhost_is_exportable(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
expect_function_call(__wrap_info_int);
assert_ptr_equal(localhost->exporting_flags, NULL);
assert_int_equal(__real_rrdhost_is_exportable(instance, localhost), 1);
assert_string_equal(log_line, "enabled exporting of host 'localhost' for instance 'instance_name'");
assert_ptr_not_equal(localhost->exporting_flags, NULL);
assert_int_equal(localhost->exporting_flags[0], RRDHOST_FLAG_BACKEND_SEND);
}
static void test_false_rrdhost_is_exportable(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
simple_pattern_free(instance->config.hosts_pattern);
instance->config.hosts_pattern = simple_pattern_create("!*", NULL, SIMPLE_PATTERN_EXACT);
expect_function_call(__wrap_info_int);
assert_ptr_equal(localhost->exporting_flags, NULL);
assert_int_equal(__real_rrdhost_is_exportable(instance, localhost), 0);
assert_string_equal(log_line, "disabled exporting of host 'localhost' for instance 'instance_name'");
assert_ptr_not_equal(localhost->exporting_flags, NULL);
assert_int_equal(localhost->exporting_flags[0], RRDHOST_FLAG_BACKEND_DONT_SEND);
}
static void test_rrdset_is_exportable(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
RRDSET *st = localhost->rrdset_root;
assert_ptr_equal(st->exporting_flags, NULL);
assert_int_equal(__real_rrdset_is_exportable(instance, st), 1);
assert_ptr_not_equal(st->exporting_flags, NULL);
assert_int_equal(st->exporting_flags[0], RRDSET_FLAG_BACKEND_SEND);
}
static void test_false_rrdset_is_exportable(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
RRDSET *st = localhost->rrdset_root;
simple_pattern_free(instance->config.charts_pattern);
instance->config.charts_pattern = simple_pattern_create("!*", NULL, SIMPLE_PATTERN_EXACT);
assert_ptr_equal(st->exporting_flags, NULL);
assert_int_equal(__real_rrdset_is_exportable(instance, st), 0);
assert_ptr_not_equal(st->exporting_flags, NULL);
assert_int_equal(st->exporting_flags[0], RRDSET_FLAG_BACKEND_IGNORE);
}
static void test_exporting_calculate_value_from_stored_data(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
RRDDIM *rd = localhost->rrdset_root->dimensions;
time_t timestamp;
instance->after = 3;
instance->before = 10;
expect_function_call(__mock_rrddim_query_oldest_time);
will_return(__mock_rrddim_query_oldest_time, 1);
expect_function_call(__mock_rrddim_query_latest_time);
will_return(__mock_rrddim_query_latest_time, 2);
expect_function_call(__mock_rrddim_query_init);
expect_value(__mock_rrddim_query_init, start_time, 1);
expect_value(__mock_rrddim_query_init, end_time, 2);
expect_function_call(__mock_rrddim_query_is_finished);
will_return(__mock_rrddim_query_is_finished, 0);
expect_function_call(__mock_rrddim_query_next_metric);
will_return(__mock_rrddim_query_next_metric, pack_storage_number(27, SN_EXISTS));
expect_function_call(__mock_rrddim_query_is_finished);
will_return(__mock_rrddim_query_is_finished, 0);
expect_function_call(__mock_rrddim_query_next_metric);
will_return(__mock_rrddim_query_next_metric, pack_storage_number(45, SN_EXISTS));
expect_function_call(__mock_rrddim_query_is_finished);
will_return(__mock_rrddim_query_is_finished, 1);
expect_function_call(__mock_rrddim_query_finalize);
assert_int_equal(__real_exporting_calculate_value_from_stored_data(instance, rd, &timestamp), 36);
}
static void test_prepare_buffers(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
instance->start_batch_formatting = __mock_start_batch_formatting;
instance->start_host_formatting = __mock_start_host_formatting;
instance->start_chart_formatting = __mock_start_chart_formatting;
instance->metric_formatting = __mock_metric_formatting;
instance->end_chart_formatting = __mock_end_chart_formatting;
instance->end_host_formatting = __mock_end_host_formatting;
instance->end_batch_formatting = __mock_end_batch_formatting;
__real_mark_scheduled_instances(engine);
expect_function_call(__mock_start_batch_formatting);
expect_value(__mock_start_batch_formatting, instance, instance);
will_return(__mock_start_batch_formatting, 0);
expect_function_call(__wrap_rrdhost_is_exportable);
expect_value(__wrap_rrdhost_is_exportable, instance, instance);
expect_value(__wrap_rrdhost_is_exportable, host, localhost);
will_return(__wrap_rrdhost_is_exportable, 1);
expect_function_call(__mock_start_host_formatting);
expect_value(__mock_start_host_formatting, instance, instance);
expect_value(__mock_start_host_formatting, host, localhost);
will_return(__mock_start_host_formatting, 0);
RRDSET *st = localhost->rrdset_root;
expect_function_call(__wrap_rrdset_is_exportable);
expect_value(__wrap_rrdset_is_exportable, instance, instance);
expect_value(__wrap_rrdset_is_exportable, st, st);
will_return(__wrap_rrdset_is_exportable, 1);
expect_function_call(__mock_start_chart_formatting);
expect_value(__mock_start_chart_formatting, instance, instance);
expect_value(__mock_start_chart_formatting, st, st);
will_return(__mock_start_chart_formatting, 0);
RRDDIM *rd = localhost->rrdset_root->dimensions;
expect_function_call(__mock_metric_formatting);
expect_value(__mock_metric_formatting, instance, instance);
expect_value(__mock_metric_formatting, rd, rd);
will_return(__mock_metric_formatting, 0);
expect_function_call(__mock_end_chart_formatting);
expect_value(__mock_end_chart_formatting, instance, instance);
expect_value(__mock_end_chart_formatting, st, st);
will_return(__mock_end_chart_formatting, 0);
expect_function_call(__mock_end_host_formatting);
expect_value(__mock_end_host_formatting, instance, instance);
expect_value(__mock_end_host_formatting, host, localhost);
will_return(__mock_end_host_formatting, 0);
expect_function_call(__mock_end_batch_formatting);
expect_value(__mock_end_batch_formatting, instance, instance);
will_return(__mock_end_batch_formatting, 0);
assert_int_equal(__real_prepare_buffers(engine), 0);
assert_int_equal(instance->stats.chart_buffered_metrics, 1);
// check with NULL functions
instance->start_batch_formatting = NULL;
instance->start_host_formatting = NULL;
instance->start_chart_formatting = NULL;
instance->metric_formatting = NULL;
instance->end_chart_formatting = NULL;
instance->end_host_formatting = NULL;
instance->end_batch_formatting = NULL;
assert_int_equal(__real_prepare_buffers(engine), 0);
assert_int_equal(instance->scheduled, 0);
assert_int_equal(instance->after, 2);
}
static void test_exporting_name_copy(void **state)
{
(void)state;
char *source_name = "test.name-with/special#characters_";
char destination_name[RRD_ID_LENGTH_MAX + 1];
assert_int_equal(exporting_name_copy(destination_name, source_name, RRD_ID_LENGTH_MAX), 34);
assert_string_equal(destination_name, "test.name_with_special_characters_");
}
static void test_format_dimension_collected_graphite_plaintext(void **state)
{
struct engine *engine = *state;
RRDDIM *rd = localhost->rrdset_root->dimensions;
assert_int_equal(format_dimension_collected_graphite_plaintext(engine->connector_root->instance_root, rd), 0);
assert_string_equal(
buffer_tostring(engine->connector_root->instance_root->buffer),
"netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n");
}
static void test_format_dimension_stored_graphite_plaintext(void **state)
{
struct engine *engine = *state;
expect_function_call(__wrap_exporting_calculate_value_from_stored_data);
will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS));
RRDDIM *rd = localhost->rrdset_root->dimensions;
assert_int_equal(format_dimension_stored_graphite_plaintext(engine->connector_root->instance_root, rd), 0);
assert_string_equal(
buffer_tostring(engine->connector_root->instance_root->buffer),
"netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 690565856.0000000 15052\n");
}
static void test_format_dimension_collected_json_plaintext(void **state)
{
struct engine *engine = *state;
RRDDIM *rd = localhost->rrdset_root->dimensions;
assert_int_equal(format_dimension_collected_json_plaintext(engine->connector_root->instance_root, rd), 0);
assert_string_equal(
buffer_tostring(engine->connector_root->instance_root->buffer),
"{\"prefix\":\"netdata\",\"hostname\":\"test-host\",\"host_tags\":\"TAG1=VALUE1 TAG2=VALUE2\","
"\"chart_id\":\"chart_id\",\"chart_name\":\"chart_name\",\"chart_family\":\"(null)\","
"\"chart_context\": \"(null)\",\"chart_type\":\"(null)\",\"units\": \"(null)\",\"id\":\"dimension_id\","
"\"name\":\"dimension_name\",\"value\":123000321,\"timestamp\": 15051}\n");
}
static void test_format_dimension_stored_json_plaintext(void **state)
{
struct engine *engine = *state;
expect_function_call(__wrap_exporting_calculate_value_from_stored_data);
will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS));
RRDDIM *rd = localhost->rrdset_root->dimensions;
assert_int_equal(format_dimension_stored_json_plaintext(engine->connector_root->instance_root, rd), 0);
assert_string_equal(
buffer_tostring(engine->connector_root->instance_root->buffer),
"{\"prefix\":\"netdata\",\"hostname\":\"test-host\",\"host_tags\":\"TAG1=VALUE1 TAG2=VALUE2\","
"\"chart_id\":\"chart_id\",\"chart_name\":\"chart_name\",\"chart_family\":\"(null)\"," \
"\"chart_context\": \"(null)\",\"chart_type\":\"(null)\",\"units\": \"(null)\",\"id\":\"dimension_id\","
"\"name\":\"dimension_name\",\"value\":690565856.0000000,\"timestamp\": 15052}\n");
}
static void test_format_dimension_collected_opentsdb_telnet(void **state)
{
struct engine *engine = *state;
RRDDIM *rd = localhost->rrdset_root->dimensions;
assert_int_equal(format_dimension_collected_opentsdb_telnet(engine->connector_root->instance_root, rd), 0);
assert_string_equal(
buffer_tostring(engine->connector_root->instance_root->buffer),
"put netdata.chart_name.dimension_name 15051 123000321 host=test-host TAG1=VALUE1 TAG2=VALUE2\n");
}
static void test_format_dimension_stored_opentsdb_telnet(void **state)
{
struct engine *engine = *state;
expect_function_call(__wrap_exporting_calculate_value_from_stored_data);
will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS));
RRDDIM *rd = localhost->rrdset_root->dimensions;
assert_int_equal(format_dimension_stored_opentsdb_telnet(engine->connector_root->instance_root, rd), 0);
assert_string_equal(
buffer_tostring(engine->connector_root->instance_root->buffer),
"put netdata.chart_name.dimension_name 15052 690565856.0000000 host=test-host TAG1=VALUE1 TAG2=VALUE2\n");
}
static void test_format_dimension_collected_opentsdb_http(void **state)
{
struct engine *engine = *state;
RRDDIM *rd = localhost->rrdset_root->dimensions;
assert_int_equal(format_dimension_collected_opentsdb_http(engine->connector_root->instance_root, rd), 0);
assert_string_equal(
buffer_tostring(engine->connector_root->instance_root->buffer),
"POST /api/put HTTP/1.1\r\n"
"Host: test-host\r\n"
"Content-Type: application/json\r\n"
"Content-Length: 153\r\n\r\n"
"{ \"metric\": \"netdata.chart_name.dimension_name\", "
"\"timestamp\": 15051, "
"\"value\": 123000321, "
"\"tags\": { \"host\": \"test-host TAG1=VALUE1 TAG2=VALUE2\" }}");
}
static void test_format_dimension_stored_opentsdb_http(void **state)
{
struct engine *engine = *state;
expect_function_call(__wrap_exporting_calculate_value_from_stored_data);
will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS));
RRDDIM *rd = localhost->rrdset_root->dimensions;
assert_int_equal(format_dimension_stored_opentsdb_http(engine->connector_root->instance_root, rd), 0);
assert_string_equal(
buffer_tostring(engine->connector_root->instance_root->buffer),
"POST /api/put HTTP/1.1\r\n"
"Host: test-host\r\n"
"Content-Type: application/json\r\n"
"Content-Length: 161\r\n\r\n"
"{ \"metric\": \"netdata.chart_name.dimension_name\", "
"\"timestamp\": 15052, "
"\"value\": 690565856.0000000, "
"\"tags\": { \"host\": \"test-host TAG1=VALUE1 TAG2=VALUE2\" }}");
}
static void test_exporting_discard_response(void **state)
{
struct engine *engine = *state;
BUFFER *response = buffer_create(0);
buffer_sprintf(response, "Test response");
expect_function_call(__wrap_info_int);
assert_int_equal(exporting_discard_response(response, engine->connector_root->instance_root), 0);
assert_string_equal(
log_line,
"EXPORTING: received 13 bytes from instance_name connector instance. Ignoring them. Sample: 'Test response'");
assert_int_equal(buffer_strlen(response), 0);
buffer_free(response);
}
static void test_simple_connector_receive_response(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
struct stats *stats = &instance->stats;
int sock = 1;
expect_function_call(__wrap_recv);
expect_value(__wrap_recv, sockfd, 1);
expect_not_value(__wrap_recv, buf, 0);
expect_value(__wrap_recv, len, 4096);
expect_value(__wrap_recv, flags, MSG_DONTWAIT);
expect_function_call(__wrap_info_int);
simple_connector_receive_response(&sock, instance);
assert_string_equal(
log_line,
"EXPORTING: received 9 bytes from instance_name connector instance. Ignoring them. Sample: 'Test recv'");
assert_int_equal(stats->chart_received_bytes, 9);
assert_int_equal(stats->chart_receptions, 1);
assert_int_equal(sock, 1);
}
static void test_simple_connector_send_buffer(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
struct stats *stats = &instance->stats;
BUFFER *buffer = instance->buffer;
int sock = 1;
int failures = 3;
__real_mark_scheduled_instances(engine);
expect_function_call(__wrap_rrdhost_is_exportable);
expect_value(__wrap_rrdhost_is_exportable, instance, instance);
expect_value(__wrap_rrdhost_is_exportable, host, localhost);
will_return(__wrap_rrdhost_is_exportable, 1);
RRDSET *st = localhost->rrdset_root;
expect_function_call(__wrap_rrdset_is_exportable);
expect_value(__wrap_rrdset_is_exportable, instance, instance);
expect_value(__wrap_rrdset_is_exportable, st, st);
will_return(__wrap_rrdset_is_exportable, 1);
__real_prepare_buffers(engine);
expect_function_call(__wrap_send);
expect_value(__wrap_send, sockfd, 1);
expect_value(__wrap_send, buf, buffer_tostring(buffer));
expect_string(
__wrap_send, buf, "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n");
expect_value(__wrap_send, len, 84);
expect_value(__wrap_send, flags, MSG_NOSIGNAL);
simple_connector_send_buffer(&sock, &failures, instance);
assert_int_equal(failures, 0);
assert_int_equal(stats->chart_transmission_successes, 1);
assert_int_equal(stats->chart_sent_bytes, 84);
assert_int_equal(stats->chart_sent_metrics, 1);
assert_int_equal(stats->chart_transmission_failures, 0);
assert_int_equal(buffer_strlen(buffer), 0);
assert_int_equal(sock, 1);
}
static void test_simple_connector_worker(void **state)
{
struct engine *engine = *state;
struct instance *instance = engine->connector_root->instance_root;
BUFFER *buffer = instance->buffer;
__real_mark_scheduled_instances(engine);
expect_function_call(__wrap_rrdhost_is_exportable);
expect_value(__wrap_rrdhost_is_exportable, instance, instance);
expect_value(__wrap_rrdhost_is_exportable, host, localhost);
will_return(__wrap_rrdhost_is_exportable, 1);
RRDSET *st = localhost->rrdset_root;
expect_function_call(__wrap_rrdset_is_exportable);
expect_value(__wrap_rrdset_is_exportable, instance, instance);
expect_value(__wrap_rrdset_is_exportable, st, st);
will_return(__wrap_rrdset_is_exportable, 1);
__real_prepare_buffers(engine);
expect_function_call(__wrap_connect_to_one_of);
expect_string(__wrap_connect_to_one_of, destination, "localhost");
expect_value(__wrap_connect_to_one_of, default_port, 2003);
expect_not_value(__wrap_connect_to_one_of, reconnects_counter, 0);
expect_value(__wrap_connect_to_one_of, connected_to, 0);
expect_value(__wrap_connect_to_one_of, connected_to_size, 0);
will_return(__wrap_connect_to_one_of, 2);
expect_function_call(__wrap_send);
expect_value(__wrap_send, sockfd, 2);
expect_value(__wrap_send, buf, buffer_tostring(buffer));
expect_string(
__wrap_send, buf, "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n");
expect_value(__wrap_send, len, 84);
expect_value(__wrap_send, flags, MSG_NOSIGNAL);
simple_connector_worker(instance);
}
int main(void)
{
const struct CMUnitTest tests[] = {
cmocka_unit_test_setup_teardown(test_exporting_engine, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test(test_read_exporting_config),
cmocka_unit_test_setup_teardown(test_init_connectors, setup_configured_engine, teardown_configured_engine),
cmocka_unit_test_setup_teardown(
test_init_graphite_instance, setup_configured_engine, teardown_configured_engine),
cmocka_unit_test_setup_teardown(
test_init_json_instance, setup_configured_engine, teardown_configured_engine),
cmocka_unit_test_setup_teardown(
test_init_opentsdb_connector, setup_configured_engine, teardown_configured_engine),
cmocka_unit_test_setup_teardown(
test_init_opentsdb_telnet_instance, setup_configured_engine, teardown_configured_engine),
cmocka_unit_test_setup_teardown(
test_init_opentsdb_http_instance, setup_configured_engine, teardown_configured_engine),
cmocka_unit_test_setup_teardown(
test_mark_scheduled_instances, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_rrdhost_is_exportable, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_false_rrdhost_is_exportable, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_rrdset_is_exportable, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_false_rrdset_is_exportable, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_exporting_calculate_value_from_stored_data, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(test_prepare_buffers, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test(test_exporting_name_copy),
cmocka_unit_test_setup_teardown(
test_format_dimension_collected_graphite_plaintext, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_format_dimension_stored_graphite_plaintext, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_format_dimension_collected_json_plaintext, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_format_dimension_stored_json_plaintext, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_format_dimension_collected_opentsdb_telnet, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_format_dimension_stored_opentsdb_telnet, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_format_dimension_collected_opentsdb_http, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_format_dimension_stored_opentsdb_http, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_exporting_discard_response, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_simple_connector_receive_response, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_simple_connector_send_buffer, setup_initialized_engine, teardown_initialized_engine),
cmocka_unit_test_setup_teardown(
test_simple_connector_worker, setup_initialized_engine, teardown_initialized_engine),
};
return cmocka_run_group_tests_name("exporting_engine", tests, NULL, NULL);
}

View File

@@ -0,0 +1,110 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef TEST_EXPORTING_ENGINE_H
#define TEST_EXPORTING_ENGINE_H 1
#include "libnetdata/libnetdata.h"
#include "exporting/exporting_engine.h"
#include "exporting/graphite/graphite.h"
#include "exporting/json/json.h"
#include "exporting/opentsdb/opentsdb.h"
#include <stdarg.h>
#include <stddef.h>
#include <setjmp.h>
#include <stdint.h>
#include <cmocka.h>
#define MAX_LOG_LINE 1024
extern char log_line[];
// -----------------------------------------------------------------------
// doubles for Netdata functions
const char *__wrap_strdupz(const char *s);
void __wrap_info_int(const char *file, const char *function, const unsigned long line, const char *fmt, ...);
int __wrap_connect_to_one_of(
const char *destination,
int default_port,
struct timeval *timeout,
size_t *reconnects_counter,
char *connected_to,
size_t connected_to_size);
void __rrdhost_check_rdlock(RRDHOST *host, const char *file, const char *function, const unsigned long line);
void __rrdset_check_rdlock(RRDSET *st, const char *file, const char *function, const unsigned long line);
void __rrd_check_rdlock(const char *file, const char *function, const unsigned long line);
time_t __mock_rrddim_query_oldest_time(RRDDIM *rd);
time_t __mock_rrddim_query_latest_time(RRDDIM *rd);
void __mock_rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
int __mock_rrddim_query_is_finished(struct rrddim_query_handle *handle);
storage_number __mock_rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time);
void __mock_rrddim_query_finalize(struct rrddim_query_handle *handle);
// -----------------------------------------------------------------------
// wraps for system functions
void __wrap_uv_thread_create(uv_thread_t thread, void (*worker)(void *arg), void *arg);
void __wrap_uv_mutex_lock(uv_mutex_t *mutex);
void __wrap_uv_mutex_unlock(uv_mutex_t *mutex);
void __wrap_uv_cond_signal(uv_cond_t *cond_var);
void __wrap_uv_cond_wait(uv_cond_t *cond_var, uv_mutex_t *mutex);
ssize_t __wrap_recv(int sockfd, void *buf, size_t len, int flags);
ssize_t __wrap_send(int sockfd, const void *buf, size_t len, int flags);
// -----------------------------------------------------------------------
// doubles and originals for exporting engine functions
struct engine *__real_read_exporting_config();
struct engine *__wrap_read_exporting_config();
struct engine *__mock_read_exporting_config();
int __real_init_connectors(struct engine *engine);
int __wrap_init_connectors(struct engine *engine);
int __real_mark_scheduled_instances(struct engine *engine);
int __wrap_mark_scheduled_instances(struct engine *engine);
calculated_number __real_exporting_calculate_value_from_stored_data(
struct instance *instance,
RRDDIM *rd,
time_t *last_timestamp);
calculated_number __wrap_exporting_calculate_value_from_stored_data(
struct instance *instance,
RRDDIM *rd,
time_t *last_timestamp);
int __real_prepare_buffers(struct engine *engine);
int __wrap_prepare_buffers(struct engine *engine);
int __wrap_notify_workers(struct engine *engine);
int __wrap_send_internal_metrics(struct engine *engine);
int __real_rrdhost_is_exportable(struct instance *instance, RRDHOST *host);
int __wrap_rrdhost_is_exportable(struct instance *instance, RRDHOST *host);
int __real_rrdset_is_exportable(struct instance *instance, RRDSET *st);
int __wrap_rrdset_is_exportable(struct instance *instance, RRDSET *st);
int __mock_start_batch_formatting(struct instance *instance);
int __mock_start_host_formatting(struct instance *instance, RRDHOST *host);
int __mock_start_chart_formatting(struct instance *instance, RRDSET *st);
int __mock_metric_formatting(struct instance *instance, RRDDIM *rd);
int __mock_end_chart_formatting(struct instance *instance, RRDSET *st);
int __mock_end_host_formatting(struct instance *instance, RRDHOST *host);
int __mock_end_batch_formatting(struct instance *instance);
// -----------------------------------------------------------------------
// fixtures
int setup_configured_engine(void **state);
int teardown_configured_engine(void **state);
int setup_rrdhost();
int teardown_rrdhost();
int setup_initialized_engine(void **state);
int teardown_initialized_engine(void **state);
void init_connectors_in_tests(struct engine *engine);
#endif /* TEST_EXPORTING_ENGINE_H */

View File

@@ -42,6 +42,89 @@ struct section {
// readers are protected using the rwlock in avl_tree_lock
};
/*
* @Input:
* Connector / instance to add to an internal structure
* @Return
* The current head of the linked list of connector_instance
*
*/
_CONNECTOR_INSTANCE *add_connector_instance(struct section *connector, struct section *instance)
{
static struct _connector_instance *global_connector_instance = NULL;
struct _connector_instance *local_ci, *local_ci_tmp;
if (unlikely(!connector)) {
if (unlikely(!instance))
return global_connector_instance;
local_ci = global_connector_instance;
while (local_ci) {
local_ci_tmp = local_ci->next;
freez(local_ci);
local_ci = local_ci_tmp;
}
global_connector_instance = NULL;
return NULL;
}
local_ci = callocz(1, sizeof(struct _connector_instance));
local_ci->instance = instance;
local_ci->connector = connector;
strncpy(local_ci->instance_name, instance->name, CONFIG_MAX_NAME);
strncpy(local_ci->connector_name, connector->name, CONFIG_MAX_NAME);
local_ci->next = global_connector_instance;
global_connector_instance = local_ci;
return global_connector_instance;
}
int is_valid_connector(char *type, int check_reserved)
{
int rc = 1;
if (unlikely(!type))
return 0;
if (!check_reserved) {
if (unlikely(is_valid_connector(type,1))) {
return 0;
}
//if (unlikely(*type == ':')
// return 0;
char *separator = strrchr(type, ':');
if (likely(separator)) {
*separator = '\0';
rc = separator - type;
} else
return 0;
}
// else {
// if (unlikely(is_valid_connector(type,1))) {
// error("Section %s invalid -- reserved name", type);
// return 0;
// }
// }
if (!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
return rc;
} else if (!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
return rc;
} else if (!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) {
return rc;
} else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
return rc;
} else if (!strcmp(type, "prometheus_remote_write")) {
return rc;
} else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
return rc;
} else if (!strcmp(type, "mongodb") || !strcmp(type, "mongodb:plaintext")) {
return rc;
}
return 0;
}
// ----------------------------------------------------------------------------
// locking
@@ -241,6 +324,7 @@ cleanup:
return ret;
}
char *appconfig_get(struct config *root, const char *section, const char *name, const char *default_value)
{
struct config_option *cv;
@@ -440,6 +524,14 @@ int appconfig_load(struct config *root, char *filename, int overwrite_used)
{
int line = 0;
struct section *co = NULL;
int is_exporter_config = 0;
int is_backend_config = 0;
int have_backend_config = 0;
int _backends = 0; // number of backend sections we have
char working_instance[CONFIG_MAX_NAME + 1];
char working_connector[CONFIG_MAX_NAME + 1];
struct section *working_connector_section = NULL;
int global_exporting_section = 0;
char buffer[CONFIG_FILE_LINE_MAX + 1], *s;
@@ -453,6 +545,8 @@ int appconfig_load(struct config *root, char *filename, int overwrite_used)
return 0;
}
is_exporter_config = (strstr(filename, EXPORTING_CONF) != NULL);
while(fgets(buffer, CONFIG_FILE_LINE_MAX, fp) != NULL) {
buffer[CONFIG_FILE_LINE_MAX] = '\0';
line++;
@@ -469,6 +563,46 @@ int appconfig_load(struct config *root, char *filename, int overwrite_used)
s[len - 1] = '\0';
s++;
if (is_exporter_config) {
global_exporting_section = !(strcmp(s, CONFIG_SECTION_EXPORTING));
if (unlikely(!global_exporting_section)) {
int rc;
rc = is_valid_connector(s, 0);
if (likely(rc)) {
strncpy(working_connector, s, CONFIG_MAX_NAME);
s = s + rc + 1;
if (unlikely(!(*s))) {
_backends++;
sprintf(buffer, "instance_%d", _backends);
s = buffer;
}
strncpy(working_instance, s, CONFIG_MAX_NAME);
working_connector_section = NULL;
if (unlikely(appconfig_section_find(root, working_instance))) {
error("Instance (%s) already exists", working_instance);
co = NULL;
continue;
}
} else {
co = NULL;
error("Section (%s) does not specify a valid connector", s);
continue;
}
}
}
is_backend_config = !(strcmp(s, CONFIG_SECTION_BACKEND));
if (!have_backend_config)
have_backend_config = is_backend_config;
if (is_backend_config) {
if (_backends) {
sprintf(buffer, CONFIG_SECTION_BACKEND "/%d", _backends);
s = buffer;
}
_backends++;
}
co = appconfig_section_find(root, s);
if(!co) co = appconfig_section_create(root, s);
@@ -502,15 +636,32 @@ int appconfig_load(struct config *root, char *filename, int overwrite_used)
struct config_option *cv = appconfig_option_index_find(co, name, 0);
if(!cv) cv = appconfig_value_create(co, name, value);
else {
if(((cv->flags & CONFIG_VALUE_USED) && overwrite_used) || !(cv->flags & CONFIG_VALUE_USED)) {
debug(D_CONFIG, "CONFIG: line %d of file '%s', overwriting '%s/%s'.", line, filename, co->name, cv->name);
if (!cv) {
cv = appconfig_value_create(co, name, value);
if (likely(is_exporter_config) && unlikely(!global_exporting_section)) {
if (unlikely(!working_connector_section)) {
working_connector_section = appconfig_section_find(root, working_connector);
if (!working_connector_section)
working_connector_section = appconfig_section_create(root, working_connector);
if (likely(working_connector_section)) {
add_connector_instance(working_connector_section, co);
}
}
}
} else {
if (((cv->flags & CONFIG_VALUE_USED) && overwrite_used) || !(cv->flags & CONFIG_VALUE_USED)) {
debug(
D_CONFIG, "CONFIG: line %d of file '%s', overwriting '%s/%s'.", line, filename, co->name, cv->name);
freez(cv->value);
cv->value = strdupz(value);
}
else
debug(D_CONFIG, "CONFIG: ignoring line %d of file '%s', '%s/%s' is already present and used.", line, filename, co->name, cv->name);
} else
debug(
D_CONFIG,
"CONFIG: ignoring line %d of file '%s', '%s/%s' is already present and used.",
line,
filename,
co->name,
cv->name);
}
cv->flags |= CONFIG_VALUE_LOADED;
}

View File

@@ -82,15 +82,17 @@
#define CONFIG_FILENAME "netdata.conf"
#define CONFIG_SECTION_GLOBAL "global"
#define CONFIG_SECTION_WEB "web"
#define CONFIG_SECTION_STATSD "statsd"
#define CONFIG_SECTION_PLUGINS "plugins"
#define CONFIG_SECTION_CLOUD "cloud"
#define CONFIG_SECTION_REGISTRY "registry"
#define CONFIG_SECTION_HEALTH "health"
#define CONFIG_SECTION_BACKEND "backend"
#define CONFIG_SECTION_STREAM "stream"
#define CONFIG_SECTION_GLOBAL "global"
#define CONFIG_SECTION_WEB "web"
#define CONFIG_SECTION_STATSD "statsd"
#define CONFIG_SECTION_PLUGINS "plugins"
#define CONFIG_SECTION_CLOUD "cloud"
#define CONFIG_SECTION_REGISTRY "registry"
#define CONFIG_SECTION_HEALTH "health"
#define CONFIG_SECTION_BACKEND "backend"
#define CONFIG_SECTION_STREAM "stream"
#define CONFIG_SECTION_EXPORTING "exporting:global"
#define EXPORTING_CONF "exporting.conf"
// these are used to limit the configuration names and values lengths
// they are not enforced by config.c functions (they will strdup() all strings, no matter of their length)
@@ -103,6 +105,11 @@ struct config {
avl_tree_lock index;
};
//struct connector_instance {
// char instance_name[CONFIG_MAX_NAME + 1];
// char connector_name[CONFIG_MAX_NAME + 1];
//};
#define CONFIG_BOOLEAN_INVALID 100 // an invalid value to check for validity (used as default initialization when needed)
#define CONFIG_BOOLEAN_NO 0 // disabled
@@ -136,4 +143,20 @@ extern int appconfig_section_compare(void *a, void *b);
extern int config_parse_duration(const char* string, int* result);
struct connector_instance {
char instance_name[CONFIG_MAX_NAME + 1];
char connector_name[CONFIG_MAX_NAME + 1];
};
typedef struct _connector_instance {
struct section *connector; // actual connector
struct section *instance; // This instance
char instance_name[CONFIG_MAX_NAME + 1];
char connector_name[CONFIG_MAX_NAME + 1];
struct _connector_instance *next; // Next instance
} _CONNECTOR_INSTANCE;
extern _CONNECTOR_INSTANCE *add_connector_instance(struct section *connector, struct section *instance);
#endif /* NETDATA_CONFIG_H */