1
0
mirror of https://github.com/netdata/netdata.git synced 2021-06-06 23:03:21 +03:00

Adds ACLK-NG as fallback(#10315)

* adds a new implementation of ACLK written almost from scratch
* external dependencies only OpenSSL and JSON-C
* fallback for systems where ACLK Legacy can't build (for technical or philosophical reasons)
* can be forced to build by giving "--aclk-ng" to the installer
This commit is contained in:
Timotej S
2021-03-16 12:38:16 +01:00
committed by GitHub
parent 2d2f249fd7
commit e7e5d0c372
42 changed files with 3884 additions and 20 deletions

View File

@@ -12,6 +12,8 @@ jobs:
steps:
- name: Git clone repository
uses: actions/checkout@v2
with:
submodules: recursive
- run: |
git fetch --prune --unshallow --tags
- name: Build
@@ -98,6 +100,8 @@ jobs:
steps:
- name: Git clone repository
uses: actions/checkout@v2
with:
submodules: recursive
- name: install-required-packages.sh on ${{ matrix.distro }}
env:
PRE: ${{ matrix.pre }}
@@ -183,6 +187,8 @@ jobs:
steps:
- name: Git clone repository
uses: actions/checkout@v2
with:
submodules: recursive
- name: install-required-packages.sh on ${{ matrix.distro }}
env:
PRE: ${{ matrix.pre }}

View File

@@ -12,6 +12,8 @@ jobs:
steps:
- name: Git clone repository
uses: actions/checkout@v2
with:
submodules: recursive
- name: Run checksum checks on kickstart files
env:
LOCAL_ONLY: "true"
@@ -23,6 +25,8 @@ jobs:
steps:
- name: Git clone repository
uses: actions/checkout@v2
with:
submodules: recursive
- name: Install required packages
run: |
./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata
@@ -43,6 +47,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
- name: Build
run: >
docker run -v "$PWD":/netdata -w /netdata alpine:latest /bin/sh -c
@@ -68,6 +74,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
- name: Prepare environment
run: |
./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata
@@ -98,6 +106,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
- name: Prepare environment
run: ./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata
- name: Build netdata

View File

@@ -15,6 +15,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
- name: Prepare environment
env:
DEBIAN_FRONTEND: 'noninteractive'

View File

@@ -26,6 +26,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
- name: Determine if we should push changes and which tags to use
if: github.event_name == 'workflow_dispatch' && github.event.inputs.version != 'nightly'
run: |

View File

@@ -16,6 +16,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
- name: Run link check
uses: gaurav-nelson/github-action-markdown-link-check@v1
with:

View File

@@ -16,6 +16,7 @@ jobs:
- name: Git clone repository
uses: actions/checkout@v2
with:
submodules: recursive
fetch-depth: 0
- name: Check files
run: |
@@ -57,6 +58,7 @@ jobs:
- name: Git clone repository
uses: actions/checkout@v2
with:
submodules: recursive
fetch-depth: 0
- name: Check files
run: |
@@ -80,6 +82,7 @@ jobs:
- name: Git clone repository
uses: actions/checkout@v2
with:
submodules: recursive
fetch-depth: 0
- name: Check files
run: |

View File

@@ -21,6 +21,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
- name: Prepare environment
run: |
./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata-all
@@ -39,6 +41,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
submodules: recursive
- name: Prepare environment
run: |
./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata-all
@@ -48,7 +52,7 @@ jobs:
- name: Configure
run: |
autoreconf -ivf
./configure
./configure --without-aclk-ng
# XXX: Work-around for bug with libbson-1.0 in Ubuntu 18.04
# See: https://bugs.launchpad.net/ubuntu/+source/libmongoc/+bug/1790771
# https://jira.mongodb.org/browse/CDRIVER-2818

View File

@@ -34,6 +34,8 @@ jobs:
steps:
- name: Git clone repository
uses: actions/checkout@v2
with:
submodules: recursive
- name: Install required packages & build tarball
run: |
./packaging/installer/install-required-packages.sh --dont-wait --non-interactive netdata-all

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule "mqtt_websockets"]
path = mqtt_websockets
url = https://github.com/underhood/mqtt_websockets.git

View File

@@ -116,10 +116,18 @@ SUBDIRS += \
web \
claim \
parser \
aclk/legacy \
spawn \
$(NULL)
if ACLK_NG
SUBDIRS += \
mqtt_websockets \
$(NULL)
else
SUBDIRS += \
aclk/legacy \
$(NULL)
endif
AM_CFLAGS = \
$(OPTIONAL_MATH_CFLAGS) \
@@ -525,6 +533,28 @@ PARSER_FILES = \
parser/parser.h \
$(NULL)
if ACLK_NG
ACLK_FILES = \
aclk/aclk.c \
aclk/aclk.h \
aclk/aclk_util.c \
aclk/aclk_util.h \
aclk/aclk_stats.c \
aclk/aclk_stats.h \
aclk/aclk_query.c \
aclk/aclk_query.h \
aclk/aclk_query_queue.c \
aclk/aclk_query_queue.h \
aclk/aclk_collector_list.c \
aclk/aclk_collector_list.h \
aclk/aclk_otp.c \
aclk/aclk_otp.h \
aclk/aclk_tx_msgs.c \
aclk/aclk_tx_msgs.h \
aclk/aclk_rx_msgs.c \
aclk/aclk_rx_msgs.h \
$(NULL)
else #ACLK_NG
ACLK_FILES = \
aclk/legacy/aclk_rrdhost_state.h \
aclk/legacy/aclk_common.c \
@@ -548,9 +578,8 @@ ACLK_FILES += \
aclk/legacy/aclk_lws_https_client.c \
aclk/legacy/aclk_lws_https_client.h \
$(NULL)
endif
endif #ENABLE_ACLK
endif #ACLK_NG
SPAWN_PLUGIN_FILES = \
spawn/spawn.c \
@@ -714,6 +743,12 @@ NETDATACLI_FILES = \
sbin_PROGRAMS += netdata
netdata_SOURCES = $(NETDATA_FILES)
if ACLK_NG
netdata_LDADD = \
mqtt_websockets/libmqttwebsockets.a \
$(NETDATA_COMMON_LIBS) \
$(NULL)
else #ACLK_NG
if ENABLE_ACLK
netdata_LDADD = \
externaldeps/mosquitto/libmosquitto.a \
@@ -721,11 +756,12 @@ netdata_LDADD = \
$(OPTIONAL_LWS_LIBS) \
$(NETDATA_COMMON_LIBS) \
$(NULL)
else
else #ENABLE_ACLK
netdata_LDADD = \
$(NETDATA_COMMON_LIBS) \
$(NULL)
endif
endif #ENABLE_ACLK
endif #ACLK_NG
if ENABLE_CXX_LINKER
netdata_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@

820
aclk/aclk.c Normal file
View File

@@ -0,0 +1,820 @@
#include "aclk.h"
#include "aclk_stats.h"
#include "mqtt_wss_client.h"
#include "aclk_otp.h"
#include "aclk_tx_msgs.h"
#include "aclk_query.h"
#include "aclk_query_queue.h"
#include "aclk_util.h"
#include "aclk_rx_msgs.h"
#include "aclk_collector_list.h"
#ifdef ACLK_LOG_CONVERSATION_DIR
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#endif
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
//TODO remove most (as in 99.999999999%) of this crap
int aclk_connected = 0;
int aclk_disable_runtime = 0;
int aclk_disable_single_updates = 0;
int aclk_kill_link = 0;
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
mqtt_wss_client mqttwss_client;
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
struct aclk_shared_state aclk_shared_state = {
.agent_state = AGENT_INITIALIZING,
.last_popcorn_interrupt = 0,
.version_neg = 0,
.version_neg_wait_till = 0,
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
};
void aclk_single_update_disable()
{
aclk_disable_single_updates = 1;
}
void aclk_single_update_enable()
{
aclk_disable_single_updates = 0;
}
//ENDTODO
static RSA *aclk_private_key = NULL;
static int load_private_key()
{
if (aclk_private_key != NULL)
RSA_free(aclk_private_key);
aclk_private_key = NULL;
char filename[FILENAME_MAX + 1];
snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
long bytes_read;
char *private_key = read_by_filename(filename, &bytes_read);
if (!private_key) {
error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
return 1;
}
debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
BIO *key_bio = BIO_new_mem_buf(private_key, -1);
if (key_bio==NULL) {
error("Claimed agent cannot establish ACLK - failed to create BIO for key");
goto biofailed;
}
aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
BIO_free(key_bio);
if (aclk_private_key!=NULL)
{
freez(private_key);
return 0;
}
char err[512];
ERR_error_string_n(ERR_get_error(), err, sizeof(err));
error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
biofailed:
freez(private_key);
return 1;
}
static int wait_till_cloud_enabled()
{
info("Waiting for Cloud to be enabled");
while (!netdata_cloud_setting) {
sleep_usec(USEC_PER_SEC * 1);
if (netdata_exit)
return 1;
}
return 0;
}
/**
* Will block until agent is claimed. Returns only if agent claimed
* or if agent needs to shutdown.
*
* @return `0` if agent has been claimed,
* `1` if interrupted due to agent shutting down
*/
static int wait_till_agent_claimed(void)
{
//TODO prevent malloc and freez
char *agent_id = is_agent_claimed();
while (likely(!agent_id)) {
sleep_usec(USEC_PER_SEC * 1);
if (netdata_exit)
return 1;
agent_id = is_agent_claimed();
}
freez(agent_id);
return 0;
}
/**
* Checks everything is ready for connection
* agent claimed, cloud url set and private key available
*
* @param aclk_hostname points to location where string pointer to hostname will be set
* @param ackl_port port to int where port will be saved
*
* @return If non 0 returned irrecoverable error happened and ACLK should be terminated
*/
static int wait_till_agent_claim_ready()
{
int port;
char *hostname = NULL;
while (!netdata_exit) {
if (wait_till_agent_claimed())
return 1;
// The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
// We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
error("Do not move the cloud base url out of post_conf_load!!");
return 1;
}
// We just check configuration is valid here
// TODO make it without malloc/free
if (aclk_decode_base_url(cloud_base_url, &hostname, &port)) {
error("Agent is claimed but the configuration is invalid, please fix");
freez(hostname);
hostname = NULL;
sleep(5);
continue;
}
freez(hostname);
hostname = NULL;
if (!load_private_key()) {
sleep(5);
break;
}
}
return 0;
}
void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
{
switch(log_type) {
case MQTT_WSS_LOG_ERROR:
case MQTT_WSS_LOG_FATAL:
case MQTT_WSS_LOG_WARN:
error("%s", str);
return;
case MQTT_WSS_LOG_INFO:
info("%s", str);
return;
case MQTT_WSS_LOG_DEBUG:
debug(D_ACLK, "%s", str);
return;
default:
error("Unknown log type from mqtt_wss");
}
}
//TODO prevent big buffer on stack
#define RX_MSGLEN_MAX 4096
static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
{
char cmsg[RX_MSGLEN_MAX];
size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
if (msglen > RX_MSGLEN_MAX - 1)
error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
memcpy(cmsg,
msg,
len);
cmsg[len] = 0;
#ifdef ACLK_LOG_CONVERSATION_DIR
#define FN_MAX_LEN 512
char filename[FN_MAX_LEN];
int logfd;
snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT());
logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
if(logfd < 0)
error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
write(logfd, msg, msglen);
close(logfd);
#endif
debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg);
if (strcmp(aclk_get_topic(ACLK_TOPICID_COMMAND), topic))
error("Received message on unexpected topic %s", topic);
if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
error("Link is shutting down. Ignoring message.");
return;
}
aclk_handle_cloud_message(cmsg);
}
static void puback_callback(uint16_t packet_id)
{
if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
aclk_reconnect_delay(0);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_puback(packet_id);
#endif
if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
error("Got PUBACK for shutdown message. Can exit gracefully.");
aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
}
}
static int read_query_thread_count()
{
int threads = MIN(processors/2, 6);
threads = MAX(threads, 2);
threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
if(threads < 1) {
error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
threads = 1;
config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
}
return threads;
}
/* Keeps connection alive and handles all network comms.
* Returns on error or when netdata is shutting down.
* @param client instance of mqtt_wss_client
* @returns 0 - Netdata Exits
* >0 - Error happened. Reconnect and start over.
*/
static int handle_connection(mqtt_wss_client client)
{
time_t last_periodic_query_wakeup = now_monotonic_sec();
while (!netdata_exit) {
// timeout 1000 to check at least once a second
// for netdata_exit
if (mqtt_wss_service(client, 1000) < 0){
error("Connection Error or Dropped");
return 1;
}
// mqtt_wss_service will return faster than in one second
// if there is enough work to do
time_t now = now_monotonic_sec();
if (last_periodic_query_wakeup < now) {
// wake up at least one Query Thread at least
// once per second
last_periodic_query_wakeup = now;
QUERY_THREAD_WAKEUP;
}
}
return 0;
}
inline static int aclk_popcorn_check_bump()
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
ACLK_SHARED_STATE_UNLOCK;
return 1;
}
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
static inline void queue_connect_payloads(void)
{
aclk_query_t query = aclk_query_new(METADATA_INFO);
query->data.metadata_info.host = localhost;
query->data.metadata_info.initial_on_connect = 1;
aclk_queue_query(query);
query = aclk_query_new(METADATA_ALARMS);
query->data.metadata_alarms.initial_on_connect = 1;
aclk_queue_query(query);
}
static inline void mqtt_connected_actions(mqtt_wss_client client)
{
// TODO global vars?
usec_t now = now_realtime_usec();
aclk_session_sec = now / USEC_PER_SEC;
aclk_session_us = now % USEC_PER_SEC;
mqtt_wss_subscribe(client, aclk_get_topic(ACLK_TOPICID_COMMAND), 1);
aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_pubacks_per_conn = 0;
aclk_hello_msg(client);
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
error("Sending `connect` payload immediatelly as popcorning was finished already.");
queue_connect_payloads();
}
ACLK_SHARED_STATE_UNLOCK;
}
/* Waits until agent is ready or needs to exit
* @param client instance of mqtt_wss_client
* @param query_threads pointer to aclk_query_threads
* structure where to store data about started query threads
* @return 0 - Popcorning Finished - Agent STABLE,
* !0 - netdata_exit
*/
static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads)
{
time_t elapsed;
int need_wait;
while (!netdata_exit) {
ACLK_SHARED_STATE_LOCK;
if (likely(aclk_shared_state.agent_state != AGENT_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
if (elapsed >= ACLK_STABLE_TIMEOUT) {
aclk_shared_state.agent_state = AGENT_STABLE;
ACLK_SHARED_STATE_UNLOCK;
error("ACLK localhost popocorn finished");
if (unlikely(!query_threads->thread_list))
aclk_query_threads_start(query_threads, client);
queue_connect_payloads();
return 0;
}
ACLK_SHARED_STATE_UNLOCK;
need_wait = ACLK_STABLE_TIMEOUT - elapsed;
error("ACLK localhost popocorn wait %d seconds longer", need_wait);
sleep(need_wait);
}
return 1;
}
void aclk_graceful_disconnect(mqtt_wss_client client)
{
error("Preparing to Gracefully Shutdown the ACLK");
aclk_queue_lock();
aclk_queue_flush();
aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
time_t t = now_monotonic_sec();
while (!mqtt_wss_service(client, 100)) {
if (now_monotonic_sec() - t >= 2) {
error("Wasn't able to gracefully shutdown ACLK in time!");
break;
}
if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
error("MQTT App Layer `disconnect` message sent successfully");
break;
}
}
aclk_stats_upd_online(0);
aclk_connected = 0;
error("Attempting to Gracefully Shutdown MQTT/WSS connection");
mqtt_wss_disconnect(client, 1000);
}
/* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled
* @return 0 - Go ahead and connect (delay expired)
* 1 - netdata_exit
*/
#define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
static int aclk_block_till_recon_allowed() {
// Handle reconnect exponential backoff
// fnc aclk_reconnect_delay comes from ACLK Legacy @amoss
// but has been modifed slightly (more randomness)
unsigned long recon_delay = aclk_reconnect_delay(1);
info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
{
if (netdata_exit)
return 1;
if (recon_delay > NETDATA_EXIT_POLL_MS) {
sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
recon_delay -= NETDATA_EXIT_POLL_MS;
continue;
}
sleep_usec(recon_delay * USEC_PER_MS);
recon_delay = 0;
}
return 0;
}
#define HTTP_PROXY_PREFIX "http://"
static void set_proxy(struct mqtt_wss_proxy *out)
{
ACLK_PROXY_TYPE pt;
const char *ptr = aclk_get_proxy(&pt);
char *tmp;
char *host;
if (pt != PROXY_TYPE_HTTP)
return;
out->port = 0;
if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX)))
ptr += strlen(HTTP_PROXY_PREFIX);
if ((tmp = strchr(ptr, '@')))
ptr = tmp;
if ((tmp = strchr(ptr, '/'))) {
host = mallocz((tmp - ptr) + 1);
memcpy(host, ptr, (tmp - ptr));
host[tmp - ptr] = 0;
} else
host = strdupz(ptr);
if ((tmp = strchr(host, ':'))) {
*tmp = 0;
tmp++;
out->port = atoi(tmp);
}
if (out->port <= 0 || out->port > 65535)
out->port = 8080;
out->host = host;
out->type = MQTT_WSS_PROXY_HTTP;
}
/* Attempts to make a connection to MQTT broker over WSS
* @param client instance of mqtt_wss_client
* @return 0 - Successfull Connection,
* <0 - Irrecoverable Error -> Kill ACLK,
* >0 - netdata_exit
*/
#define CLOUD_BASE_URL_READ_RETRY 30
#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
#define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
#else
#define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
#endif
static int aclk_attempt_to_connect(mqtt_wss_client client)
{
char *aclk_hostname = NULL;
int aclk_port;
#ifndef ACLK_DISABLE_CHALLENGE
char *mqtt_otp_user = NULL;
char *mqtt_otp_pass = NULL;
#endif
json_object *lwt;
while (!netdata_exit) {
char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
if (cloud_base_url == NULL) {
error("Do not move the cloud base url out of post_conf_load!!");
return -1;
}
if (aclk_block_till_recon_allowed())
return 1;
info("Attempting connection now");
if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
sleep(CLOUD_BASE_URL_READ_RETRY);
continue;
}
struct mqtt_wss_proxy proxy_conf;
proxy_conf.type = MQTT_WSS_DIRECT;
set_proxy(&proxy_conf);
struct mqtt_connect_params mqtt_conn_params = {
.clientid = "anon",
.username = "anon",
.password = "anon",
.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA),
.will_msg = NULL,
.will_flags = MQTT_WSS_PUB_QOS2,
.keep_alive = 60
};
#ifndef ACLK_DISABLE_CHALLENGE
aclk_get_mqtt_otp(aclk_private_key, aclk_hostname, aclk_port, &mqtt_otp_user, &mqtt_otp_pass);
mqtt_conn_params.clientid = mqtt_otp_user;
mqtt_conn_params.username = mqtt_otp_user;
mqtt_conn_params.password = mqtt_otp_pass;
#endif
lwt = aclk_generate_disconnect(NULL);
mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
if (!mqtt_wss_connect(client, aclk_hostname, aclk_port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf)) {
json_object_put(lwt);
freez(aclk_hostname);
aclk_hostname = NULL;
info("MQTTWSS connection succeeded");
mqtt_connected_actions(client);
return 0;
}
freez(aclk_hostname);
aclk_hostname = NULL;
json_object_put(lwt);
error("Connect failed\n");
}
return 1;
}
/**
* Main agent cloud link thread
*
* This thread will simply call the main event loop that handles
* pending requests - both inbound and outbound
*
* @param ptr is a pointer to the netdata_static_thread structure.
*
* @return It always returns NULL
*/
void *aclk_main(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_stats_thread *stats_thread = NULL;
struct aclk_query_threads query_threads;
query_threads.thread_list = NULL;
ACLK_PROXY_TYPE proxy_type;
aclk_get_proxy(&proxy_type);
if (proxy_type == PROXY_TYPE_SOCKS5) {
error("SOCKS5 proxy is not supported by ACLK-NG yet.");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
}
// This thread is unusual in that it cannot be cancelled by cancel_main_threads()
// as it must notify the far end that it shutdown gracefully and avoid the LWT.
netdata_thread_disable_cancelability();
#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
info("Killing ACLK thread -> cloud functionality has been disabled");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
#endif
aclk_popcorn_check_bump(); // start localhost popcorn timer
query_threads.count = read_query_thread_count();
if (wait_till_cloud_enabled())
goto exit;
if (wait_till_agent_claim_ready())
goto exit;
if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
if (aclk_stats_enabled) {
stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
stats_thread->query_thread_count = query_threads.count;
netdata_thread_create(
stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
stats_thread);
}
// Keep reconnecting and talking until our time has come
// and the Grim Reaper (netdata_exit) calls
do {
if (aclk_attempt_to_connect(mqttwss_client))
goto exit_full;
// warning this assumes the popcorning is relative short (3s)
// if that changes call mqtt_wss_service from within
// to keep OpenSSL, WSS and MQTT connection alive
if (wait_popcorning_finishes(mqttwss_client, &query_threads))
goto exit_full;
if (!handle_connection(mqttwss_client)) {
aclk_stats_upd_online(0);
aclk_connected = 0;
}
} while (!netdata_exit);
aclk_graceful_disconnect(mqttwss_client);
exit_full:
// Tear Down
QUERY_THREAD_WAKEUP_ALL;
aclk_query_threads_cleanup(&query_threads);
if (aclk_stats_enabled) {
netdata_thread_join(*stats_thread->thread, NULL);
aclk_stats_thread_cleanup();
freez(stats_thread->thread);
freez(stats_thread);
}
free_topic_cache();
mqtt_wss_destroy(mqttwss_client);
exit:
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
}
// TODO this is taken over as workaround from old ACLK
// fix this in both old and new ACLK
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
void aclk_alarm_reload(void)
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return;
}
ACLK_SHARED_STATE_UNLOCK;
aclk_queue_query(aclk_query_new(METADATA_ALARMS));
}
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
BUFFER *local_buffer;
json_object *msg;
if (host != localhost)
return 0;
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
ACLK_SHARED_STATE_UNLOCK;
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
health_alarm_entry2json_nolock(local_buffer, ae, host);
netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
msg = json_tokener_parse(local_buffer->buffer);
struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE);
query->data.alarm_update = msg;
aclk_queue_query(query);
buffer_free(local_buffer);
return 0;
}
int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
struct aclk_query *query;
if (aclk_popcorn_check_bump())
return 0;
query = aclk_query_new(create ? CHART_NEW : CHART_DEL);
if(create) {
query->data.chart_add_del.host = host;
query->data.chart_add_del.chart_name = strdupz(chart_name);
} else {
query->data.metadata_info.host = host;
query->data.metadata_info.initial_on_connect = 0;
}
aclk_queue_query(query);
return 0;
}
/*
* Add a new collector to the list
* If it exists, update the chart count
*/
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
return;
}
COLLECTOR_LOCK;
tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
if (unlikely(tmp_collector->count != 1)) {
COLLECTOR_UNLOCK;
return;
}
COLLECTOR_UNLOCK;
if (aclk_popcorn_check_bump())
return;
if (host != localhost)
return;
query = aclk_query_new(METADATA_INFO);
query->data.metadata_info.host = localhost; //TODO
query->data.metadata_info.initial_on_connect = 0;
aclk_queue_query(query);
query = aclk_query_new(METADATA_ALARMS);
query->data.metadata_alarms.initial_on_connect = 0;
aclk_queue_query(query);
}
/*
* Delete a collector from the list
* If the chart count reaches zero the collector will be removed
* from the list by calling del_collector.
*
* This function will release the memory used and schedule
* a cloud update
*/
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
return;
}
COLLECTOR_LOCK;
tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
if (unlikely(!tmp_collector || tmp_collector->count)) {
COLLECTOR_UNLOCK;
return;
}
debug(
D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
tmp_collector->count);
COLLECTOR_UNLOCK;
_free_collector(tmp_collector);
if (aclk_popcorn_check_bump())
return;
if (host != localhost)
return;
query = aclk_query_new(METADATA_INFO);
query->data.metadata_info.host = localhost; //TODO
query->data.metadata_info.initial_on_connect = 0;
aclk_queue_query(query);
query = aclk_query_new(METADATA_ALARMS);
query->data.metadata_alarms.initial_on_connect = 0;
aclk_queue_query(query);
}
struct label *add_aclk_host_labels(struct label *label) {
#ifdef ENABLE_ACLK
ACLK_PROXY_TYPE aclk_proxy;
char *proxy_str;
aclk_get_proxy(&aclk_proxy);
switch(aclk_proxy) {
case PROXY_TYPE_SOCKS5:
proxy_str = "SOCKS5";
break;
case PROXY_TYPE_HTTP:
proxy_str = "HTTP";
break;
default:
proxy_str = "none";
break;
}
return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
#else
return label;
#endif
}

100
aclk/aclk.h Normal file
View File

@@ -0,0 +1,100 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ACLK_H
#define ACLK_H
typedef struct aclk_rrdhost_state {
char *claimed_id; // Claimed ID if host has one otherwise NULL
} aclk_rrdhost_state;
#include "../daemon/common.h"
#include "aclk_util.h"
// minimum and maximum supported version of ACLK
// in this version of agent
#define ACLK_VERSION_MIN 2
#define ACLK_VERSION_MAX 2
// Version negotiation messages have they own versioning
// this is also used for LWT message as we set that up
// before version negotiation
#define ACLK_VERSION_NEG_VERSION 1
// Maximum time to wait for version negotiation before aborting
// and defaulting to oldest supported version
#define VERSION_NEG_TIMEOUT 3
#if ACLK_VERSION_MIN > ACLK_VERSION_MAX
#error "ACLK_VERSION_MAX must be >= than ACLK_VERSION_MIN"
#endif
// Define ACLK Feature Version Boundaries Here
#define ACLK_V_COMPRESSION 2
// How many MQTT PUBACKs we need to get to consider connection
// stable for the purposes of TBEB (truncated binary exponential backoff)
#define ACLK_PUBACKS_CONN_STABLE 3
// TODO get rid of this shit
extern int aclk_disable_runtime;
extern int aclk_disable_single_updates;
extern int aclk_kill_link;
extern int aclk_connected;
extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
void *aclk_main(void *ptr);
void aclk_single_update_disable();
void aclk_single_update_enable();
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
.config_section = NULL, \
.config_name = NULL, \
.enabled = 1, \
.thread = NULL, \
.init_routine = NULL, \
.start_routine = aclk_main },
extern netdata_mutex_t aclk_shared_state_mutex;
#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
typedef enum aclk_agent_state {
AGENT_INITIALIZING,
AGENT_STABLE
} ACLK_AGENT_STATE;
extern struct aclk_shared_state {
ACLK_AGENT_STATE agent_state;
time_t last_popcorn_interrupt;
// read only while ACLK connected
// protect by lock otherwise
int version_neg;
usec_t version_neg_wait_till;
// To wait for `disconnect` message PUBACK
// when shuting down
// at the same time if > 0 we know link is
// shutting down
int mqtt_shutdown_msg_id;
int mqtt_shutdown_msg_rcvd;
} aclk_shared_state;
void aclk_alarm_reload(void);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
// TODO this is for bacward compatibility with ACLK legacy
#define ACLK_CMD_CHART 1
#define ACLK_CMD_CHARTDEL 0
/* Informs ACLK about created/deleted chart
* @param create 0 - if chart was deleted, other if chart created
*/
int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
struct label *add_aclk_host_labels(struct label *label);
#endif /* ACLK_H */

193
aclk/aclk_collector_list.c Normal file
View File

@@ -0,0 +1,193 @@
// SPDX-License-Identifier: GPL-3.0-or-later
// This is copied from Legacy ACLK, Original Autor: amoss
// TODO unmess this
#include "aclk_collector_list.h"
netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
struct _collector *collector_list = NULL;
/*
* Free a collector structure
*/
void _free_collector(struct _collector *collector)
{
if (likely(collector->plugin_name))
freez(collector->plugin_name);
if (likely(collector->module_name))
freez(collector->module_name);
if (likely(collector->hostname))
freez(collector->hostname);
freez(collector);
}
/*
* This will report the collector list
*
*/
#ifdef ACLK_DEBUG
static void _dump_collector_list()
{
struct _collector *tmp_collector;
COLLECTOR_LOCK;
info("DUMPING ALL COLLECTORS");
if (unlikely(!collector_list || !collector_list->next)) {
COLLECTOR_UNLOCK;
info("DUMPING ALL COLLECTORS -- nothing found");
return;
}
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
while (tmp_collector) {
info(
"COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
tmp_collector = tmp_collector->next;
}
info("DUMPING ALL COLLECTORS DONE");
COLLECTOR_UNLOCK;
}
#endif
/*
* This will cleanup the collector list
*
*/
void _reset_collector_list()
{
struct _collector *tmp_collector, *next_collector;
COLLECTOR_LOCK;
if (unlikely(!collector_list || !collector_list->next)) {
COLLECTOR_UNLOCK;
return;
}
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
collector_list->count = 0;
collector_list->next = NULL;
// We broke the link; we can unlock
COLLECTOR_UNLOCK;
while (tmp_collector) {
next_collector = tmp_collector->next;
_free_collector(tmp_collector);
tmp_collector = next_collector;
}
}
/*
* Find a collector (if it exists)
* Must lock before calling this
* If last_collector is not null, it will return the previous collector in the linked
* list (used in collector delete)
*/
static struct _collector *_find_collector(
const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
{
struct _collector *tmp_collector, *prev_collector;
uint32_t plugin_hash;
uint32_t module_hash;
uint32_t hostname_hash;
if (unlikely(!collector_list)) {
collector_list = callocz(1, sizeof(struct _collector));
return NULL;
}
if (unlikely(!collector_list->next))
return NULL;
plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
module_hash = module_name ? simple_hash(module_name) : 1;
hostname_hash = simple_hash(hostname);
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
prev_collector = collector_list;
while (tmp_collector) {
if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
(!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
(!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
if (unlikely(last_collector))
*last_collector = prev_collector;
return tmp_collector;
}
prev_collector = tmp_collector;
tmp_collector = tmp_collector->next;
}
return tmp_collector;
}
/*
* Called to delete a collector
* It will reduce the count (chart_count) and will remove it
* from the linked list if the count reaches zero
* The structure will be returned to the caller to free
* the resources
*
*/
struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector, *prev_collector = NULL;
tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
if (likely(tmp_collector)) {
--tmp_collector->count;
if (unlikely(!tmp_collector->count))
prev_collector->next = tmp_collector->next;
}
return tmp_collector;
}
/*
* Add a new collector (plugin / module) to the list
* If it already exists just update the chart count
*
* Lock before calling
*/
struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
if (unlikely(!tmp_collector)) {
tmp_collector = callocz(1, sizeof(struct _collector));
tmp_collector->hostname_hash = simple_hash(hostname);
tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
tmp_collector->hostname = strdupz(hostname);
tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
tmp_collector->next = collector_list->next;
collector_list->next = tmp_collector;
}
tmp_collector->count++;
debug(
D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
module_name ? module_name : "*", tmp_collector->count);
return tmp_collector;
}

View File

@@ -0,0 +1,39 @@
// SPDX-License-Identifier: GPL-3.0-or-later
// This is copied from Legacy ACLK, Original Autor: amoss
// TODO unmess this
#ifndef ACLK_COLLECTOR_LIST_H
#define ACLK_COLLECTOR_LIST_H
#include "libnetdata/libnetdata.h"
extern netdata_mutex_t collector_mutex;
#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
/*
* Maintain a list of collectors and chart count
* If all the charts of a collector are deleted
* then a new metadata dataset must be send to the cloud
*
*/
struct _collector {
time_t created;
uint32_t count; //chart count
uint32_t hostname_hash;
uint32_t plugin_hash;
uint32_t module_hash;
char *hostname;
char *plugin_name;
char *module_name;
struct _collector *next;
};
struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name);
void _reset_collector_list();
void _free_collector(struct _collector *collector);
#endif /* ACLK_COLLECTOR_LIST_H */

509
aclk/aclk_otp.c Normal file
View File

@@ -0,0 +1,509 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "aclk_otp.h"
#include "libnetdata/libnetdata.h"
#include "legacy/aclk_lws_https_client.h"
#include "../daemon/common.h"
#include "../mqtt_websockets/c-rbuf/include/ringbuffer.h"
struct dictionary_singleton {
char *key;
char *result;
};
static int json_extract_singleton(JSON_ENTRY *e)
{
struct dictionary_singleton *data = e->callback_data;
switch (e->type) {
case JSON_OBJECT:
case JSON_ARRAY:
break;
case JSON_STRING:
if (!strcmp(e->name, data->key)) {
data->result = strdupz(e->data.string);
break;
}
break;
case JSON_NUMBER:
case JSON_BOOLEAN:
case JSON_NULL:
break;
}
return 0;
}
// Base-64 decoder.
// Note: This is non-validating, invalid input will be decoded without an error.
// Challenges are packed into json strings so we don't skip newlines.
// Size errors (i.e. invalid input size or insufficient output space) are caught.
static size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
{
static char lookup[256];
static int first_time=1;
if (first_time)
{
first_time = 0;
for(int i=0; i<256; i++)
lookup[i] = -1;
for(int i='A'; i<='Z'; i++)
lookup[i] = i-'A';
for(int i='a'; i<='z'; i++)
lookup[i] = i-'a' + 26;
for(int i='0'; i<='9'; i++)
lookup[i] = i-'0' + 52;
lookup['+'] = 62;
lookup['/'] = 63;
}
if ((input_size & 3) != 0)
{
error("Can't decode base-64 input length %zu", input_size);
return 0;
}
size_t unpadded_size = (input_size/4) * 3;
if ( unpadded_size > output_size )
{
error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
return 0;
}
// Don't check padding within full quantums
for (size_t i = 0 ; i < input_size-4 ; i+=4 )
{
uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
output[0] = value >> 16;
output[1] = value >> 8;
output[2] = value;
//error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
output += 3;
input += 4;
}
// Handle padding only in last quantum
if (input[2] == '=') {
uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
output[0] = value >> 4;
//error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
return unpadded_size-2;
}
else if (input[3] == '=') {
uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
output[0] = value >> 10;
output[1] = value >> 2;
//error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
return unpadded_size-1;
}
else
{
uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
output[0] = value >> 16;
output[1] = value >> 8;
output[2] = value;
//error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
return unpadded_size;
}
}
static size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
{
uint32_t value;
static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
if ((input_size/3+1)*4 >= output_size)
{
error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
return 0;
}
size_t count = 0;
while (input_size>3)
{
value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
output[0] = lookup[value >> 18];
output[1] = lookup[(value >> 12) & 0x3f];
output[2] = lookup[(value >> 6) & 0x3f];
output[3] = lookup[value & 0x3f];
//error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
output += 4;
input += 3;
input_size -= 3;
count += 4;
}
switch (input_size)
{
case 2:
value = (input[0] << 10) + (input[1] << 2);
output[0] = lookup[(value >> 12) & 0x3f];
output[1] = lookup[(value >> 6) & 0x3f];
output[2] = lookup[value & 0x3f];
output[3] = '=';
//error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
count += 4;
break;
case 1:
value = input[0] << 4;
output[0] = lookup[(value >> 6) & 0x3f];
output[1] = lookup[value & 0x3f];
output[2] = '=';
output[3] = '=';
//error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
count += 4;
break;
case 0:
break;
}
return count;
}
static int private_decrypt(RSA *p_key, unsigned char * enc_data, int data_len, unsigned char *decrypted)
{
int result = RSA_private_decrypt( data_len, enc_data, decrypted, p_key, RSA_PKCS1_OAEP_PADDING);
if (result == -1) {
char err[512];
ERR_error_string_n(ERR_get_error(), err, sizeof(err));
error("Decryption of the challenge failed: %s", err);
}
return result;
}
typedef enum http_req_type {
HTTP_REQ_GET,
HTTP_REQ_POST
} http_req_type;
enum http_parse_state {
HTTP_PARSE_INITIAL = 0,
HTTP_PARSE_HEADERS,
HTTP_PARSE_CONTENT
};
typedef struct {
enum http_parse_state state;
int content_length;
int http_code;
} http_parse_ctx;
#define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 }
#define NEED_MORE_DATA 0
#define PARSE_SUCCESS 1
#define PARSE_ERROR -1
#define HTTP_LINE_TERM "\x0D\x0A"
#define RESP_PROTO "HTTP/1.1 "
#define HTTP_KEYVAL_SEPARATOR ": "
#define HTTP_HDR_BUFFER_SIZE 256
#define PORT_STR_MAX_BYTES 7
static void process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const char *val)
{
// currently we care only about content-length
// but in future the way this is written
// it can be extended
if (!strcmp("content-length", key)) {
parse_ctx->content_length = atoi(val);
}
}
static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx)
{
int idx, idx_end;
char buf_key[HTTP_HDR_BUFFER_SIZE];
char buf_val[HTTP_HDR_BUFFER_SIZE];
char *ptr = buf_key;
if (!rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx_end)) {
error("CRLF expected");
return 1;
}
char *separator = rbuf_find_bytes(buf, HTTP_KEYVAL_SEPARATOR, strlen(HTTP_KEYVAL_SEPARATOR), &idx);
if (!separator) {
error("Missing Key/Value separator");
return 1;
}
if (idx >= HTTP_HDR_BUFFER_SIZE) {
error("Key name is too long");
return 1;
}
rbuf_pop(buf, buf_key, idx);
buf_key[idx] = 0;
rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR));
idx_end -= strlen(HTTP_KEYVAL_SEPARATOR) + idx;
if (idx_end >= HTTP_HDR_BUFFER_SIZE) {
error("Value of key \"%s\" too long", buf_key);
return 1;
}
rbuf_pop(buf, buf_val, idx_end);
buf_val[idx_end] = 0;
rbuf_bump_tail(buf, strlen(HTTP_KEYVAL_SEPARATOR));
for (ptr = buf_key; *ptr; ptr++)
*ptr = tolower(*ptr);
process_http_hdr(parse_ctx, buf_key, buf_val);
return 0;
}
static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
{
int idx;
char rc[4];
do {
if (parse_ctx->state != HTTP_PARSE_CONTENT && !rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx))
return NEED_MORE_DATA;
switch (parse_ctx->state) {
case HTTP_PARSE_INITIAL:
if (rbuf_memcmp_n(buf, RESP_PROTO, strlen(RESP_PROTO))) {
error("Expected response to start with \"%s\"", RESP_PROTO);
return PARSE_ERROR;
}
rbuf_bump_tail(buf, strlen(RESP_PROTO));
if (rbuf_pop(buf, rc, 4) != 4) {
error("Expected HTTP status code");
return PARSE_ERROR;
}
if (rc[3] != ' ') {
error("Expected space after HTTP return code");
return PARSE_ERROR;
}
rc[3] = 0;
parse_ctx->http_code = atoi(rc);
if (parse_ctx->http_code < 100 || parse_ctx->http_code >= 600) {
error("HTTP code not in range 100 to 599");
return PARSE_ERROR;
}
rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx);
rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM));
parse_ctx->state = HTTP_PARSE_HEADERS;
break;
case HTTP_PARSE_HEADERS:
if (!idx) {
parse_ctx->state = HTTP_PARSE_CONTENT;
rbuf_bump_tail(buf, strlen(HTTP_LINE_TERM));
break;
}
if (parse_http_hdr(buf, parse_ctx))
return PARSE_ERROR;
rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx);
rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM));
break;
case HTTP_PARSE_CONTENT:
if (parse_ctx->content_length < 0) {
error("content-length missing and http headers ended");
return PARSE_ERROR;
}
if (rbuf_bytes_available(buf) >= (size_t)parse_ctx->content_length)
return PARSE_SUCCESS;
return NEED_MORE_DATA;
}
} while(1);
}
static int https_request(http_req_type method, char *host, int port, char *url, char *b, size_t b_size, char *payload)
{
struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
char sport[PORT_STR_MAX_BYTES];
size_t len = 0;
int rc = 1;
int ret;
char *ptr;
http_parse_ctx parse_ctx = HTTP_PARSE_CTX_INITIALIZER;
rbuf_t buffer = rbuf_create(b_size);
if (!buffer)
return 1;
snprintf(sport, PORT_STR_MAX_BYTES, "%d", port);
if (payload != NULL)
len = strlen(payload);
snprintf(
b,
b_size,
"%s %s HTTP/1.1\r\nHost: %s\r\nAccept: application/json\r\nContent-length: %zu\r\nAccept-Language: en-us\r\n"
"User-Agent: Netdata/rocks\r\n\r\n",
(method == HTTP_REQ_GET ? "GET" : "POST"), url, host, len);
if (payload != NULL)
strncat(b, payload, b_size - len);
len = strlen(b);
debug(D_ACLK, "Sending HTTPS req (%zu bytes): '%s'", len, b);
int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, sport, &timeout);
if (unlikely(sock == -1)) {
error("Handshake failed");
goto exit_buf;
}
SSL_CTX *ctx = security_initialize_openssl_client();
if (ctx==NULL) {
error("Cannot allocate SSL context");
goto exit_sock;
}
// Certificate chain: not updating the stores - do we need private CA roots?
// Calls to SSL_CTX_load_verify_locations would go here.
SSL *ssl = SSL_new(ctx);
if (ssl==NULL) {
error("Cannot allocate SSL");
goto exit_CTX;
}
SSL_set_fd(ssl, sock);
ret = SSL_connect(ssl);
if (ret != 1) {
error("SSL_connect() failed with err=%d", ret);
goto exit_SSL;
}
ret = SSL_write(ssl, b, len);
if (ret <= 0)
{
error("SSL_write() failed with err=%d", ret);
goto exit_SSL;
}
b[0] = 0;
do {
ptr = rbuf_get_linear_insert_range(buffer, &len);
ret = SSL_read(ssl, ptr, len - 1);
if (ret)
rbuf_bump_head(buffer, ret);
if (ret <= 0)
{
error("No response available - SSL_read()=%d", ret);
goto exit_FULL;
}
} while (!(ret = parse_http_response(buffer, &parse_ctx)));
if (ret != PARSE_SUCCESS) {
error("Error parsing HTTP response");
goto exit_FULL;
}
if (parse_ctx.http_code < 200 || parse_ctx.http_code >= 300) {
error("HTTP Response not Success (got %d)", parse_ctx.http_code);
goto exit_FULL;
}
len = rbuf_pop(buffer, b, b_size);
b[MIN(len, b_size-1)] = 0;
rc = 0;
exit_FULL:
exit_SSL:
SSL_free(ssl);
exit_CTX:
SSL_CTX_free(ctx);
exit_sock:
close(sock);
exit_buf:
rbuf_free(buffer);
return rc;
}
// aclk_get_mqtt_otp is slightly modified original code from @amoss
void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass)
{
char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
debug(D_ACLK, "Performing challenge-response sequence");
if (*mqtt_pass != NULL)
{
freez(*mqtt_pass);
*mqtt_pass = NULL;
}
// curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
// TODO - target host?
char *agent_id = is_agent_claimed();
if (agent_id == NULL)
{
error("Agent was not claimed - cannot perform challenge/response");
goto CLEANUP;
}
char url[1024];
sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url);
if (https_request(HTTP_REQ_GET, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
{
error("Challenge failed: %s", data_buffer);
goto CLEANUP;
}
struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
if (json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
{
freez(challenge.result);
error("Could not parse the json response with the challenge: %s", data_buffer);
goto CLEANUP;
}
if (challenge.result == NULL) {
error("Could not retrieve challenge from auth response: %s", data_buffer);
goto CLEANUP;
}
size_t challenge_len = strlen(challenge.result);
unsigned char decoded[512];
size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
unsigned char plaintext[4096]={};
int decrypted_length = private_decrypt(p_key, decoded, decoded_len, plaintext);
freez(challenge.result);
char encoded[512];
size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
encoded[encoded_len] = 0;
debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
char response_json[4096]={};
sprintf(response_json, "{\"response\":\"%s\"}", encoded);
debug(D_ACLK, "Password phase: %s",response_json);
// TODO - host
sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
if (https_request(HTTP_REQ_POST, aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
{
error("Challenge-response failed: %s", data_buffer);
goto CLEANUP;
}
debug(D_ACLK, "Password response from cloud: %s", data_buffer);
struct dictionary_singleton password = { .key = "password", .result = NULL };
if (json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
{
freez(password.result);
error("Could not parse the json response with the password: %s", data_buffer);
goto CLEANUP;
}
if (password.result == NULL ) {
error("Could not retrieve password from auth response");
goto CLEANUP;
}
if (*mqtt_pass != NULL )
freez(*mqtt_pass);
*mqtt_pass = password.result;
if (*mqtt_usr != NULL)
freez(*mqtt_usr);
*mqtt_usr = agent_id;
agent_id = NULL;
CLEANUP:
if (agent_id != NULL)
freez(agent_id);
freez(data_buffer);
return;
}

10
aclk/aclk_otp.h Normal file
View File

@@ -0,0 +1,10 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ACLK_OTP_H
#define ACLK_OTP_H
#include "../daemon/common.h"
void aclk_get_mqtt_otp(RSA *p_key, char *aclk_hostname, int port, char **mqtt_usr, char **mqtt_pass);
#endif /* ACLK_OTP_H */

295
aclk/aclk_query.c Normal file
View File

@@ -0,0 +1,295 @@
#include "aclk_query.h"
#include "aclk_stats.h"
#include "aclk_query_queue.h"
#include "aclk_tx_msgs.h"
#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
typedef struct aclk_query_handler {
aclk_query_type_t type;
char *name; // for logging purposes
int(*fnc)(mqtt_wss_client client, aclk_query_t query);
} aclk_query_handler;
static int info_metadata(mqtt_wss_client client, aclk_query_t query)
{
aclk_send_info_metadata(client,
!query->data.metadata_info.initial_on_connect,
query->data.metadata_info.host);
return 0;
}
static int alarms_metadata(mqtt_wss_client client, aclk_query_t query)
{
aclk_send_alarm_metadata(client,
!query->data.metadata_info.initial_on_connect);
return 0;
}
static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url)
{
usec_t t;
t = now_monotonic_high_precision_usec();
w->response.code = web_client_api_request_v1(host, w, url);
t = now_monotonic_high_precision_usec() - t;
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.cloud_q_process_total += t;
aclk_metrics_per_sample.cloud_q_process_count++;
if (aclk_metrics_per_sample.cloud_q_process_max < t)
aclk_metrics_per_sample.cloud_q_process_max = t;
ACLK_STATS_UNLOCK;
}
return t;
}
static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
{
int retval = 0;
usec_t t;
BUFFER *local_buffer = NULL;
#ifdef NETDATA_WITH_ZLIB
int z_ret;
BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
char *start, *end;
#endif
struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
w->acl = 0x1f;
char *mysep = strchr(query->data.http_api_v2.query, '?');
if (mysep) {
url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
*mysep = '\0';
} else
url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
mysep = strrchr(query->data.http_api_v2.query, '/');
// execute the query
t = aclk_web_api_v1_request(localhost, w, mysep ? mysep + 1 : "noop");
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
start += strlen(WEB_HDR_ACCEPT_ENC);
end = strstr(start, "\x0D\x0A");
start = strstr(start, "gzip");
if (start && start < end) {
w->response.zstream.zalloc = Z_NULL;
w->response.zstream.zfree = Z_NULL;
w->response.zstream.opaque = Z_NULL;
if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) {
w->response.zinitialized = 1;
w->response.zoutput = 1;
} else
error("Failed to initialize zlib. Proceeding without compression.");
}
}
if (w->response.data->len && w->response.zinitialized) {
w->response.zstream.next_in = (Bytef *)w->response.data->buffer;
w->response.zstream.avail_in = w->response.data->len;
do {
w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE;
w->response.zstream.next_out = w->response.zbuffer;
z_ret = deflate(&w->response.zstream, Z_FINISH);
if(z_ret < 0) {
if(w->response.zstream.msg)
error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg);
else
error("Unknown error during zlib compression.");
retval = 1;
goto cleanup;
}
int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out;
buffer_need_bytes(z_buffer, bytes_to_cpy);
memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy);
z_buffer->len += bytes_to_cpy;
} while(z_ret != Z_STREAM_END);
// so that web_client_build_http_header
// puts correct content lenght into header
buffer_free(w->response.data);
w->response.data = z_buffer;
z_buffer = NULL;
}
#endif
now_realtime_timeval(&w->tv_ready);
w->response.data->date = w->tv_ready.tv_sec;
web_client_build_http_header(w);
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
local_buffer->contenttype = CT_APPLICATION_JSON;
buffer_strcat(local_buffer, w->response.header_output->buffer);
if (w->response.data->len) {
#ifdef NETDATA_WITH_ZLIB
if (w->response.zinitialized) {
buffer_need_bytes(local_buffer, w->response.data->len);
memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
local_buffer->len += w->response.data->len;
} else {
#endif
buffer_strcat(local_buffer, w->response.data->buffer);
#ifdef NETDATA_WITH_ZLIB
}
#endif
}
aclk_http_msg_v2(client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
cleanup:
#ifdef NETDATA_WITH_ZLIB
if(w->response.zinitialized)
deflateEnd(&w->response.zstream);
buffer_free(z_buffer);
#endif
buffer_free(w->response.data);
buffer_free(w->response.header);
buffer_free(w->response.header_output);
freez(w);
buffer_free(local_buffer);
return retval;
}
static int chart_query(mqtt_wss_client client, aclk_query_t query)
{
aclk_chart_msg(client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name);
return 0;
}
static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query)
{
aclk_alarm_state_msg(client, query->data.alarm_update);
// aclk_alarm_state_msg frees the json object including the header it generates
query->data.alarm_update = NULL;
return 0;
}
aclk_query_handler aclk_query_handlers[] = {
{ .type = HTTP_API_V2, .name = "http api request v2", .fnc = http_api_v2 },
{ .type = ALARM_STATE_UPDATE, .name = "alarm state update", .fnc = alarm_state_update_query },
{ .type = METADATA_INFO, .name = "info metadata", .fnc = info_metadata },
{ .type = METADATA_ALARMS, .name = "alarms metadata", .fnc = alarms_metadata },
{ .type = CHART_NEW, .name = "chart new", .fnc = chart_query },
{ .type = CHART_DEL, .name = "chart delete", .fnc = info_metadata },
{ .type = UNKNOWN, .name = NULL, .fnc = NULL }
};
static void aclk_query_process_msg(struct aclk_query_thread *info, aclk_query_t query)
{
for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
if (aclk_query_handlers[i].type == query->type) {
debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
aclk_query_handlers[i].fnc(info->client, query);
aclk_query_free(query);
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[info->idx]++;
ACLK_STATS_UNLOCK;
}
return;
}
}
fatal("Unknown query in query queue. %u", query->type);
}
/* Processes messages from queue. Compete for work with other threads
*/
int aclk_query_process_msgs(struct aclk_query_thread *info)
{
aclk_query_t query;
while ((query = aclk_queue_pop()))
aclk_query_process_msg(info, query);
return 0;
}
/**
* Main query processing thread
*/
void *aclk_query_main_thread(void *ptr)
{
struct aclk_query_thread *info = ptr;
while (!netdata_exit) {
ACLK_SHARED_STATE_LOCK;
if (unlikely(!aclk_shared_state.version_neg)) {
if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
ACLK_SHARED_STATE_UNLOCK;
info("Waiting for ACLK Version Negotiation message from Cloud");
sleep(1);
continue;
}
errno = 0;
error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
" Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
aclk_shared_state.version_neg = ACLK_VERSION_MIN;
// When ACLK v3 is implemented you will need this
// aclk_set_rx_handlers(aclk_shared_state.version_neg);
}
ACLK_SHARED_STATE_UNLOCK;
aclk_query_process_msgs(info);
QUERY_THREAD_LOCK;
if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
sleep_usec(USEC_PER_SEC * 1);
QUERY_THREAD_UNLOCK;
}
return NULL;
}
#define TASK_LEN_MAX 16
void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client)
{
info("Starting %d query threads.", query_threads->count);
char thread_name[TASK_LEN_MAX];
query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread));
for (int i = 0; i < query_threads->count; i++) {
query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0))
error("snprintf encoding error");
netdata_thread_create(
&query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
&query_threads->thread_list[i]);
query_threads->thread_list[i].client = client;
}
}
void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
{
if (query_threads && query_threads->thread_list) {
for (int i = 0; i < query_threads->count; i++) {
netdata_thread_join(query_threads->thread_list[i].thread, NULL);
}
freez(query_threads->thread_list);
}
aclk_queue_lock();
aclk_queue_flush();
}

32
aclk/aclk_query.h Normal file
View File

@@ -0,0 +1,32 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_ACLK_QUERY_H
#define NETDATA_ACLK_QUERY_H
#include "libnetdata/libnetdata.h"
#include "mqtt_wss_client.h"
extern pthread_cond_t query_cond_wait;
extern pthread_mutex_t query_lock_wait;
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait)
// TODO
//extern volatile int aclk_connected;
struct aclk_query_thread {
netdata_thread_t thread;
int idx;
mqtt_wss_client client;
};
struct aclk_query_threads {
struct aclk_query_thread *thread_list;
int count;
};
void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client);
void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
#endif //NETDATA_AGENT_CLOUD_LINK_H

128
aclk/aclk_query_queue.c Normal file
View File

@@ -0,0 +1,128 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "aclk_query_queue.h"
#include "aclk_query.h"
#include "aclk_stats.h"
static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex)
#define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex)
static struct aclk_query_queue {
aclk_query_t head;
aclk_query_t tail;
int block_push;
} aclk_query_queue = {
.head = NULL,
.tail = NULL,
.block_push = 0
};
static inline int _aclk_queue_query(aclk_query_t query)
{
query->created = now_realtime_usec();
ACLK_QUEUE_LOCK;
if (aclk_query_queue.block_push) {
ACLK_QUEUE_UNLOCK;
if(!netdata_exit)
error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
aclk_query_free(query);
return 1;
}
if (!aclk_query_queue.head) {
aclk_query_queue.head = query;
aclk_query_queue.tail = query;
ACLK_QUEUE_UNLOCK;
return 0;
}
// TODO deduplication
aclk_query_queue.tail->next = query;
aclk_query_queue.tail = query;
ACLK_QUEUE_UNLOCK;
return 0;
}
int aclk_queue_query(aclk_query_t query)
{
int ret = _aclk_queue_query(query);
if (!ret) {
QUERY_THREAD_WAKEUP;
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_queued++;
ACLK_STATS_UNLOCK;
}
}
return ret;
}
aclk_query_t aclk_queue_pop(void)
{
aclk_query_t ret;
ACLK_QUEUE_LOCK;
if (aclk_query_queue.block_push) {
ACLK_QUEUE_UNLOCK;
if(!netdata_exit)
error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
return NULL;
}
ret = aclk_query_queue.head;
if (!ret) {
ACLK_QUEUE_UNLOCK;
return ret;
}
aclk_query_queue.head = ret->next;
if (unlikely(!aclk_query_queue.head))
aclk_query_queue.tail = aclk_query_queue.head;
ACLK_QUEUE_UNLOCK;
ret->next = NULL;
return ret;
}
void aclk_queue_flush(void)
{
aclk_query_t query = aclk_queue_pop();
while (query) {
aclk_query_free(query);
query = aclk_queue_pop();
};
}
aclk_query_t aclk_query_new(aclk_query_type_t type)
{
aclk_query_t query = callocz(1, sizeof(struct aclk_query));
query->type = type;
return query;
}
void aclk_query_free(aclk_query_t query)
{
if (query->type == HTTP_API_V2) {
freez(query->data.http_api_v2.payload);
if (query->data.http_api_v2.query != query->dedup_id)
freez(query->data.http_api_v2.query);
}
if (query->type == CHART_NEW)
freez(query->data.chart_add_del.chart_name);
if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update)
json_object_put(query->data.alarm_update);
freez(query->dedup_id);
freez(query->callback_topic);
freez(query->msg_id);
freez(query);
}
void aclk_queue_lock(void)
{
ACLK_QUEUE_LOCK;
aclk_query_queue.block_push = 1;
ACLK_QUEUE_UNLOCK;
}

71
aclk/aclk_query_queue.h Normal file
View File

@@ -0,0 +1,71 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_ACLK_QUERY_QUEUE_H
#define NETDATA_ACLK_QUERY_QUEUE_H
#include "libnetdata/libnetdata.h"
#include "../daemon/common.h"
typedef enum {
UNKNOWN,
METADATA_INFO,
METADATA_ALARMS,
HTTP_API_V2,
CHART_NEW,
CHART_DEL,
ALARM_STATE_UPDATE
} aclk_query_type_t;
struct aclk_query_metadata {
RRDHOST *host;
int initial_on_connect;
};
struct aclk_query_chart_add_del {
RRDHOST *host;
char* chart_name;
};
struct aclk_query_http_api_v2 {
char *payload;
char *query;
};
typedef struct aclk_query *aclk_query_t;
struct aclk_query {
aclk_query_type_t type;
// dedup_id is used to deduplicate queries in the list
// if type and dedup_id is the same message is deduplicated
// set dedup_id to NULL to never deduplicate the message
// set dedup_id to constant (e.g. empty string "") to make
// message of this type ever exist only once in the list
char *dedup_id;
char *callback_topic;
char *msg_id;
usec_t created;
aclk_query_t next;
// TODO maybe remove?
int version;
union {
struct aclk_query_metadata metadata_info;
struct aclk_query_metadata metadata_alarms;
struct aclk_query_http_api_v2 http_api_v2;
struct aclk_query_chart_add_del chart_add_del;
json_object *alarm_update;
} data;
};
aclk_query_t aclk_query_new(aclk_query_type_t type);
void aclk_query_free(aclk_query_t query);
int aclk_queue_query(aclk_query_t query);
aclk_query_t aclk_queue_pop(void);
void aclk_queue_flush(void);
void aclk_queue_lock(void);
#endif /* NETDATA_ACLK_QUERY_QUEUE_H */

343
aclk/aclk_rx_msgs.c Normal file
View File

@@ -0,0 +1,343 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "aclk_rx_msgs.h"
#include "aclk_stats.h"
#include "aclk_query_queue.h"
#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/"
struct aclk_request {
char *type_id;
char *msg_id;
char *callback_topic;
char *payload;
int version;
int min_version;
int max_version;
};
int cloud_to_agent_parse(JSON_ENTRY *e)
{
struct aclk_request *data = e->callback_data;
switch (e->type) {
case JSON_OBJECT:
case JSON_ARRAY:
break;
case JSON_STRING:
if (!strcmp(e->name, "msg-id")) {
data->msg_id = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, "type")) {
data->type_id = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, "callback-topic")) {
data->callback_topic = strdupz(e->data.string);
break;
}
if (!strcmp(e->name, "payload")) {
if (likely(e->data.string)) {
size_t len = strlen(e->data.string);
data->payload = mallocz(len+1);
if (!url_decode_r(data->payload, e->data.string, len + 1))
strcpy(data->payload, e->data.string);
}
break;
}
break;
case JSON_NUMBER:
if (!strcmp(e->name, "version")) {
data->version = e->data.number;
break;
}
if (!strcmp(e->name, "min-version")) {
data->min_version = e->data.number;
break;
}
if (!strcmp(e->name, "max-version")) {
data->max_version = e->data.number;
break;
}
break;
case JSON_BOOLEAN:
break;
case JSON_NULL:
break;
}
return 0;
}
static inline int aclk_extract_v2_data(char *payload, char **data)
{
char* ptr = strstr(payload, ACLK_V2_PAYLOAD_SEPARATOR);
if(!ptr)
return 1;
ptr += strlen(ACLK_V2_PAYLOAD_SEPARATOR);
*data = strdupz(ptr);
return 0;
}
static inline int aclk_v2_payload_get_query(const char *payload, char **query_url)
{
const char *start, *end;
if(strncmp(payload, ACLK_CLOUD_REQ_V2_PREFIX, strlen(ACLK_CLOUD_REQ_V2_PREFIX))) {
errno = 0;
error("Only accepting requests that start with \"%s\" from CLOUD.", ACLK_CLOUD_REQ_V2_PREFIX);
return 1;
}
start = payload + 4;
if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) {
errno = 0;
error("Doesn't look like HTTP GET request.");
return 1;
}
*query_url = mallocz((end - start) + 1);
strncpyz(*query_url, start, end - start);
return 0;
}
#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\
debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
ACLK_SHARED_STATE_UNLOCK;\
return 1;\
}\
ACLK_SHARED_STATE_UNLOCK;
static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, char *raw_payload)
{
HTTP_CHECK_AGENT_INITIALIZED();
aclk_query_t query;
errno = 0;
if (cloud_to_agent->version < ACLK_V_COMPRESSION) {
error(
"This handler cannot reply to request with version older than %d, received %d.",
ACLK_V_COMPRESSION,
cloud_to_agent->version);
return 1;
}
query = aclk_query_new(HTTP_API_V2);
if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) {
error("Error extracting payload expected after the JSON dictionary.");
goto error;
}
if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) {
error("Could not extract payload from query");
goto error;
}
if (unlikely(!cloud_to_agent->callback_topic)) {
error("Missing callback_topic");
goto error;
}
if (unlikely(!cloud_to_agent->msg_id)) {
error("Missing msg_id");
goto error;
}
// aclk_queue_query takes ownership of data pointer
query->callback_topic = cloud_to_agent->callback_topic;
// for clarity and code readability as when we process the request
// it would be strange to get URL from `dedup_id`
query->data.http_api_v2.query = query->dedup_id;
query->msg_id = cloud_to_agent->msg_id;
aclk_queue_query(query);
return 0;
error:
aclk_query_free(query);
return 1;
}
// This handles `version` message from cloud used to negotiate
// protocol version we will use
static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, char *raw_payload)
{
UNUSED(raw_payload);
int version = -1;
errno = 0;
if (unlikely(cloud_to_agent->version != ACLK_VERSION_NEG_VERSION)) {
error(
"Unsuported version of \"version\" message from cloud. Expected %d, Got %d",
ACLK_VERSION_NEG_VERSION,
cloud_to_agent->version);
return 1;
}
if (unlikely(!cloud_to_agent->min_version)) {
error("Min version missing or 0");
return 1;
}
if (unlikely(!cloud_to_agent->max_version)) {
error("Max version missing or 0");
return 1;
}
if (unlikely(cloud_to_agent->max_version < cloud_to_agent->min_version)) {
error(
"Max version (%d) must be >= than min version (%d)", cloud_to_agent->max_version,
cloud_to_agent->min_version);
return 1;
}
if (unlikely(cloud_to_agent->min_version > ACLK_VERSION_MAX)) {
error(
"Agent too old for this cloud. Minimum version required by cloud %d."
" Maximum version supported by this agent %d.",
cloud_to_agent->min_version, ACLK_VERSION_MAX);
aclk_kill_link = 1;
aclk_disable_runtime = 1;
return 1;
}
if (unlikely(cloud_to_agent->max_version < ACLK_VERSION_MIN)) {
error(
"Cloud version is too old for this agent. Maximum version supported by cloud %d."
" Minimum (oldest) version supported by this agent %d.",
cloud_to_agent->max_version, ACLK_VERSION_MIN);
aclk_kill_link = 1;
return 1;
}
version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
ACLK_SHARED_STATE_LOCK;
if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
errno = 0;
error("The \"version\" message came too late ignoring.");
goto err_cleanup;
}
if (unlikely(aclk_shared_state.version_neg)) {
errno = 0;
error("Version has already been set to %d", aclk_shared_state.version_neg);
goto err_cleanup;
}
aclk_shared_state.version_neg = version;
ACLK_SHARED_STATE_UNLOCK;
info("Choosing version %d of ACLK", version);
aclk_set_rx_handlers(version);
return 0;
err_cleanup:
ACLK_SHARED_STATE_UNLOCK;
return 1;
}
typedef struct aclk_incoming_msg_type{
char *name;
int(*fnc)(struct aclk_request *, char *);
}aclk_incoming_msg_type;
aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
{ .name = "http", .fnc = aclk_handle_cloud_request_v2 },
{ .name = "version", .fnc = aclk_handle_version_response },
{ .name = NULL, .fnc = NULL }
};
struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
void aclk_set_rx_handlers(int version)
{
// ACLK_NG ACLK version support starts at 2
// TODO ACLK v3
UNUSED(version);
aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
}
int aclk_handle_cloud_message(char *payload)
{
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.cloud_req_recvd++;
ACLK_STATS_UNLOCK;
}
if (unlikely(!payload)) {
errno = 0;
error("ACLK incoming message is empty");
goto err_cleanup_nojson;
}
debug(D_ACLK, "ACLK incoming message (%s)", payload);
int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
if (unlikely(rc != JSON_OK)) {
errno = 0;
error("Malformed json request (%s)", payload);
goto err_cleanup;
}
if (!cloud_to_agent.type_id) {
errno = 0;
error("Cloud message is missing compulsory key \"type\"");
goto err_cleanup;
}
if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
goto err_cleanup;
}
for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
// in case of success handler is supposed to clean up after itself
// or as in the case of aclk_handle_cloud_request take
// ownership of the pointers (done to avoid copying)
// see what `aclk_queue_query` parameter `internal` does
// NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
// msg handlers (namely aclk_handle_version_responce)
// can freely change what aclk_incoming_msg_types points to
// so either exit or restart this for loop
freez(cloud_to_agent.type_id);
return 0;
}
goto err_cleanup;
}
}
errno = 0;
error("Unknown message type from Cloud \"%s\"", cloud_to_agent.type_id);
err_cleanup:
if (cloud_to_agent.payload)
freez(cloud_to_agent.payload);
if (cloud_to_agent.type_id)
freez(cloud_to_agent.type_id);
if (cloud_to_agent.msg_id)
freez(cloud_to_agent.msg_id);
if (cloud_to_agent.callback_topic)
freez(cloud_to_agent.callback_topic);
err_cleanup_nojson:
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.cloud_req_err++;
ACLK_STATS_UNLOCK;
}
return 1;
}

14
aclk/aclk_rx_msgs.h Normal file
View File

@@ -0,0 +1,14 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ACLK_RX_MSGS_H
#define ACLK_RX_MSGS_H
#include "../daemon/common.h"
#include "libnetdata/libnetdata.h"
int aclk_handle_cloud_message(char *payload);
void aclk_set_rx_handlers(int version);
#endif /* ACLK_RX_MSGS_H */

274
aclk/aclk_stats.c Normal file
View File

@@ -0,0 +1,274 @@
#include "aclk_stats.h"
netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
int aclk_stats_enabled;
int query_thread_count;
// data ACLK stats need per query thread
struct aclk_qt_data {
RRDDIM *dim;
} *aclk_qt_data = NULL;
uint32_t *aclk_queries_per_thread = NULL;
uint32_t *aclk_queries_per_thread_sample = NULL;
struct aclk_metrics aclk_metrics = {
.online = 0,
};
struct aclk_metrics_per_sample aclk_metrics_per_sample;
static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent)
{
static RRDSET *st_aclkstats = NULL;
static RRDDIM *rd_online_status = NULL;
if (unlikely(!st_aclkstats)) {
st_aclkstats = rrdset_create_localhost(
"netdata", "aclk_status", NULL, "aclk", NULL, "ACLK/Cloud connection status",
"connected", "netdata", "stats", 200000, localhost->rrd_update_every, RRDSET_TYPE_LINE);
rd_online_status = rrddim_add(st_aclkstats, "online", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(st_aclkstats);
rrddim_set_by_pointer(st_aclkstats, rd_online_status, per_sample->offline_during_sample ? 0 : permanent->online);
rrdset_done(st_aclkstats);
}
static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
{
static RRDSET *st_query_thread = NULL;
static RRDDIM *rd_queued = NULL;
static RRDDIM *rd_dispatched = NULL;
if (unlikely(!st_query_thread)) {
st_query_thread = rrdset_create_localhost(
"netdata", "aclk_query_per_second", NULL, "aclk", NULL, "ACLK Queries per second", "queries/s",
"netdata", "stats", 200001, localhost->rrd_update_every, RRDSET_TYPE_AREA);
rd_queued = rrddim_add(st_query_thread, "added", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
rd_dispatched = rrddim_add(st_query_thread, "dispatched", NULL, -1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(st_query_thread);
rrddim_set_by_pointer(st_query_thread, rd_queued, per_sample->queries_queued);
rrddim_set_by_pointer(st_query_thread, rd_dispatched, per_sample->queries_dispatched);
rrdset_done(st_query_thread);
}
#ifdef NETDATA_INTERNAL_CHECKS
static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_avg = NULL;
static RRDDIM *rd_max = NULL;
if (unlikely(!st)) {
st = rrdset_create_localhost(
"netdata", "aclk_latency_mqtt", NULL, "aclk", NULL, "ACLK Message Publish Latency", "ms",
"netdata", "stats", 200002, localhost->rrd_update_every, RRDSET_TYPE_LINE);
rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_max = rrddim_add(st, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(st);
if(per_sample->latency_count)
rrddim_set_by_pointer(st, rd_avg, roundf((float)per_sample->latency_total / per_sample->latency_count));
else
rrddim_set_by_pointer(st, rd_avg, 0);
rrddim_set_by_pointer(st, rd_max, per_sample->latency_max);
rrdset_done(st);
}
#endif
static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_rq_rcvd = NULL;
static RRDDIM *rd_rq_err = NULL;
if (unlikely(!st)) {
st = rrdset_create_localhost(
"netdata", "aclk_cloud_req", NULL, "aclk", NULL, "Requests received from cloud", "req/s",
"netdata", "stats", 200005, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
rd_rq_rcvd = rrddim_add(st, "received", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
rd_rq_err = rrddim_add(st, "malformed", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(st);
rrddim_set_by_pointer(st, rd_rq_rcvd, per_sample->cloud_req_recvd - per_sample->cloud_req_err);
rrddim_set_by_pointer(st, rd_rq_err, per_sample->cloud_req_err);
rrdset_done(st);
}
#define MAX_DIM_NAME 16
static void aclk_stats_query_threads(uint32_t *queries_per_thread)
{
static RRDSET *st = NULL;
char dim_name[MAX_DIM_NAME];
if (unlikely(!st)) {
st = rrdset_create_localhost(
"netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
"netdata", "stats", 200007, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
for (int i = 0; i < query_thread_count; i++) {
if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
error("snprintf encoding error");
aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
}
} else
rrdset_next(st);
for (int i = 0; i < query_thread_count; i++) {
rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]);
}
rrdset_done(st);
}
static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_rq_avg = NULL;
static RRDDIM *rd_rq_max = NULL;
static RRDDIM *rd_rq_total = NULL;
if (unlikely(!st)) {
st = rrdset_create_localhost(
"netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us",
"netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_LINE);
rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(st);
if(per_sample->cloud_q_process_count)
rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count));
else
rrddim_set_by_pointer(st, rd_rq_avg, 0);
rrddim_set_by_pointer(st, rd_rq_max, per_sample->cloud_q_process_max);
rrddim_set_by_pointer(st, rd_rq_total, per_sample->cloud_q_process_total);
rrdset_done(st);
}
void aclk_stats_thread_cleanup()
{
freez(aclk_qt_data);
freez(aclk_queries_per_thread);
freez(aclk_queries_per_thread_sample);
}
void *aclk_stats_main_thread(void *ptr)
{
struct aclk_stats_thread *args = ptr;
query_thread_count = args->query_thread_count;
aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
heartbeat_t hb;
heartbeat_init(&hb);
usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
struct aclk_metrics_per_sample per_sample;
struct aclk_metrics permanent;
while (!netdata_exit) {
netdata_thread_testcancel();
// ------------------------------------------------------------------------
// Wait for the next iteration point.
heartbeat_next(&hb, step_ut);
if (netdata_exit) break;
ACLK_STATS_LOCK;
// to not hold lock longer than necessary, especially not to hold it
// during database rrd* operations
memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count);
memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count);
ACLK_STATS_UNLOCK;
aclk_stats_collect(&per_sample, &permanent);
aclk_stats_query_queue(&per_sample);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_latency(&per_sample);
#endif
aclk_stats_cloud_req(&per_sample);
aclk_stats_query_threads(aclk_queries_per_thread_sample);
aclk_stats_query_time(&per_sample);
}
return 0;
}
void aclk_stats_upd_online(int online) {
if(!aclk_stats_enabled)
return;
ACLK_STATS_LOCK;
aclk_metrics.online = online;
if(!online)
aclk_metrics_per_sample.offline_during_sample = 1;
ACLK_STATS_UNLOCK;
}
#ifdef NETDATA_INTERNAL_CHECKS
static usec_t pub_time[UINT16_MAX];
void aclk_stats_msg_published(uint16_t id)
{
ACLK_STATS_LOCK;
pub_time[id] = now_boottime_usec();
ACLK_STATS_UNLOCK;
}
void aclk_stats_msg_puback(uint16_t id)
{
ACLK_STATS_LOCK;
usec_t t;
if (!aclk_stats_enabled) {
ACLK_STATS_UNLOCK;
return;
}
if (unlikely(!pub_time[id])) {
ACLK_STATS_UNLOCK;
error("Received PUBACK for unknown message?!");
return;
}
t = now_boottime_usec() - pub_time[id];
t /= USEC_PER_MS;
pub_time[id] = 0;
if (aclk_metrics_per_sample.latency_max < t)
aclk_metrics_per_sample.latency_max = t;
aclk_metrics_per_sample.latency_total += t;
aclk_metrics_per_sample.latency_count++;
ACLK_STATS_UNLOCK;
}
#endif /* NETDATA_INTERNAL_CHECKS */

64
aclk/aclk_stats.h Normal file
View File

@@ -0,0 +1,64 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_ACLK_STATS_H
#define NETDATA_ACLK_STATS_H
#include "../daemon/common.h"
#include "libnetdata/libnetdata.h"
#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
extern netdata_mutex_t aclk_stats_mutex;
#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex)
#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex)
extern int aclk_stats_enabled;
struct aclk_stats_thread {
netdata_thread_t *thread;
int query_thread_count;
};
// preserve between samples
struct aclk_metrics {
volatile uint8_t online;
};
// reset to 0 on every sample
extern struct aclk_metrics_per_sample {
/* in the unlikely event of ACLK disconnecting
and reconnecting under 1 sampling rate
we want to make sure we record the disconnection
despite it being then seemingly longer in graph */
volatile uint8_t offline_during_sample;
volatile uint32_t queries_queued;
volatile uint32_t queries_dispatched;
#ifdef NETDATA_INTERNAL_CHECKS
volatile uint32_t latency_max;
volatile uint32_t latency_total;
volatile uint32_t latency_count;
#endif
volatile uint32_t cloud_req_recvd;
volatile uint32_t cloud_req_err;
volatile uint32_t cloud_q_process_total;
volatile uint32_t cloud_q_process_count;
volatile uint32_t cloud_q_process_max;
} aclk_metrics_per_sample;
extern uint32_t *aclk_queries_per_thread;
void *aclk_stats_main_thread(void *ptr);
void aclk_stats_thread_cleanup();
void aclk_stats_upd_online(int online);
#ifdef NETDATA_INTERNAL_CHECKS
void aclk_stats_msg_published(uint16_t id);
void aclk_stats_msg_puback(uint16_t id);
#endif /* NETDATA_INTERNAL_CHECKS */
#endif /* NETDATA_ACLK_STATS_H */

395
aclk/aclk_tx_msgs.c Normal file
View File

@@ -0,0 +1,395 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "aclk_tx_msgs.h"
#include "../daemon/common.h"
#include "aclk_util.h"
#include "aclk_stats.h"
#ifndef __GNUC__
#pragma region aclk_tx_msgs helper functions
#endif
static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
{
uint16_t packet_id;
const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
#ifdef ACLK_LOG_CONVERSATION_DIR
#define FN_MAX_LEN 1024
char filename[FN_MAX_LEN];
snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif
}
static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
{
uint16_t packet_id;
const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
#ifdef ACLK_LOG_CONVERSATION_DIR
#define FN_MAX_LEN 1024
char filename[FN_MAX_LEN];
snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif
return packet_id;
}
/* UNUSED now but can be used soon MVP1?
static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
{
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
return;
}
const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published();
#endif
#ifdef ACLK_LOG_CONVERSATION_DIR
#define FN_MAX_LEN 1024
char filename[FN_MAX_LEN];
snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif
}
*/
#define TOPIC_MAX_LEN 512
#define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
static void aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
{
uint16_t packet_id;
const char *str;
char *full_msg;
int len;
if (unlikely(!topic || topic[0] != '/')) {
error ("Full topic required!");
return;
}
str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
len = strlen(str);
full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
memcpy(full_msg, str, len);
memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
memcpy(&full_msg[len], payload, payload_len);
len += payload_len;
/* TODO
#ifdef ACLK_LOG_CONVERSATION_DIR
#define FN_MAX_LEN 1024
char filename[FN_MAX_LEN];
snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
#endif */
mqtt_wss_publish_pid(client, topic, full_msg, len, MQTT_WSS_PUB_QOS1, &packet_id);
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
#endif
freez(full_msg);
}
/*
* Creates universal header common for all ACLK messages. User gets ownership of json object created.
* Usually this is freed by send function after message has been sent.
*/
static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
{
uuid_t uuid;
char uuid_str[36 + 1];
json_object *tmp;
json_object *obj = json_object_new_object();
tmp = json_object_new_string(type);
json_object_object_add(obj, "type", tmp);
if (unlikely(!msg_id)) {
uuid_generate(uuid);
uuid_unparse(uuid, uuid_str);
msg_id = uuid_str;
}
if (ts_secs == 0) {
ts_us = now_realtime_usec();
ts_secs = ts_us / USEC_PER_SEC;
ts_us = ts_us % USEC_PER_SEC;
}
tmp = json_object_new_string(msg_id);
json_object_object_add(obj, "msg-id", tmp);
tmp = json_object_new_int64(ts_secs);
json_object_object_add(obj, "timestamp", tmp);
// TODO handle this somehow on older json-c
// tmp = json_object_new_uint64(ts_us);
// probably jso->_to_json_strinf -> custom function
// jso->o.c_uint64 -> map this with pointer to signed int
// commit that implements json_object_new_uint64 is 3c3b592
// between 0.14 and 0.15
tmp = json_object_new_int64(ts_us);
json_object_object_add(obj, "timestamp-offset-usec", tmp);
tmp = json_object_new_int64(aclk_session_sec);
json_object_object_add(obj, "connect", tmp);
// TODO handle this somehow see above
// tmp = json_object_new_uint64(0 /* TODO aclk_session_us */);
tmp = json_object_new_int64(aclk_session_us);
json_object_object_add(obj, "connect-offset-usec", tmp);
tmp = json_object_new_int(version);
json_object_object_add(obj, "version", tmp);
return obj;
}
static char *create_uuid()
{
uuid_t uuid;
char *uuid_str = mallocz(36 + 1);
uuid_generate(uuid);
uuid_unparse(uuid, uuid_str);
return uuid_str;
}
#ifndef __GNUC__
#pragma endregion
#endif
#ifndef __GNUC__
#pragma region aclk_tx_msgs message generators
#endif
/*
* This will send the /api/v1/info
*/
#define BUFFER_INITIAL_SIZE (1024 * 16)
void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host)
{
BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
json_object *msg, *payload, *tmp;
char *msg_id = create_uuid();
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
// on_connect messages are sent on a health reload, if the on_connect message is real then we
// use the session time as the fake timestamp to indicate that it starts the session. If it is
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted)
msg = create_hdr("update", msg_id, 0, 0, aclk_shared_state.version_neg);
else
msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
web_client_api_request_v1_info_fill_buffer(host, local_buffer);
tmp = json_tokener_parse(local_buffer->buffer);
json_object_object_add(payload, "info", tmp);
buffer_flush(local_buffer);
charts2json(host, local_buffer, 1, 0);
tmp = json_tokener_parse(local_buffer->buffer);
json_object_object_add(payload, "charts", tmp);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
json_object_put(msg);
freez(msg_id);
buffer_free(local_buffer);
}
// TODO should include header instead
void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
{
BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
json_object *msg, *payload, *tmp;
char *msg_id = create_uuid();
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
// on_connect messages are sent on a health reload, if the on_connect message is real then we
// use the session time as the fake timestamp to indicate that it starts the session. If it is
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted)
msg = create_hdr("connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
else
msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
payload = json_object_new_object();
json_object_object_add(msg, "payload", payload);
health_alarms2json(localhost, local_buffer, 1);
tmp = json_tokener_parse(local_buffer->buffer);
json_object_object_add(payload, "configured-alarms", tmp);
buffer_flush(local_buffer);
health_active_log_alarms_2json(localhost, local_buffer);
tmp = json_tokener_parse(local_buffer->buffer);
json_object_object_add(payload, "alarms-active", tmp);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS);
json_object_put(msg);
freez(msg_id);
buffer_free(local_buffer);
}
void aclk_hello_msg(mqtt_wss_client client)
{
json_object *tmp, *msg;
char *msg_id = create_uuid();
ACLK_SHARED_STATE_LOCK;
aclk_shared_state.version_neg = 0;
aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
ACLK_SHARED_STATE_UNLOCK;
//Hello message is versioned separatelly from the rest of the protocol
msg = create_hdr("hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
tmp = json_object_new_int(ACLK_VERSION_MIN);
json_object_object_add(msg, "min-version", tmp);
tmp = json_object_new_int(ACLK_VERSION_MAX);
json_object_object_add(msg, "max-version", tmp);
#ifdef ACLK_NG
tmp = json_object_new_string("Next Generation");
#else
tmp = json_object_new_string("Legacy");
#endif
json_object_object_add(msg, "aclk-implementation", tmp);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
json_object_put(msg);
freez(msg_id);
}
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
{
json_object *tmp, *msg;
msg = create_hdr("http", msg_id, 0, 0, 2);
tmp = json_object_new_int64(t_exec);
json_object_object_add(msg, "t-exec", tmp);
tmp = json_object_new_int64(created);
json_object_object_add(msg, "t-rx", tmp);
tmp = json_object_new_int(http_code);
json_object_object_add(msg, "http-code", tmp);
aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
json_object_put(msg);
}
void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
{
json_object *msg, *payload;
BUFFER *tmp_buffer;
RRDSET *st;
st = rrdset_find(host, chart);
if (!st)
st = rrdset_find_byname(host, chart);
if (!st) {
info("FAILED to find chart %s", chart);
return;
}
tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE);
rrdset2json(st, tmp_buffer, NULL, NULL, 1);
payload = json_tokener_parse(tmp_buffer->buffer);
if (!payload) {
error("Failed to parse JSON from rrdset2json");
buffer_free(tmp_buffer);
return;
}
msg = create_hdr("chart", NULL, 0, 0, aclk_shared_state.version_neg);
json_object_object_add(msg, "payload", payload);
aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
buffer_free(tmp_buffer);
json_object_put(msg);
}
void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
{
// we create header here on purpose (and not send message with it already as `msg` param)
// one is version_neg is guaranteed to be done here
// other are timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
// send message with timestamps already to Query Queue they would be incorrect at time
// when query queue would get to send them)
json_object *obj = create_hdr("status-change", NULL, 0, 0, aclk_shared_state.version_neg);
json_object_object_add(obj, "payload", msg);
aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
json_object_put(obj);
}
/*
* Will generate disconnect message.
* @param message if NULL it will generate LWT message (unexpected).
* Otherwise string pointed to by this parameter will be used as
* reason.
*/
json_object *aclk_generate_disconnect(const char *message)
{
json_object *tmp, *msg;
msg = create_hdr("disconnect", NULL, 0, 0, 2);
tmp = json_object_new_string(message ? message : "unexpected");
json_object_object_add(msg, "payload", tmp);
return msg;
}
int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
{
int pid;
json_object *msg = aclk_generate_disconnect(message);
pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA);
json_object_put(msg);
return pid;
}
#ifndef __GNUC__
#pragma endregion
#endif

24
aclk/aclk_tx_msgs.h Normal file
View File

@@ -0,0 +1,24 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ACLK_TX_MSGS_H
#define ACLK_TX_MSGS_H
#include <json-c/json.h>
#include "libnetdata/libnetdata.h"
#include "../daemon/common.h"
#include "mqtt_wss_client.h"
void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host);
void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted);
void aclk_hello_msg(mqtt_wss_client client);
void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len);
void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart);
void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg);
json_object *aclk_generate_disconnect(const char *message);
int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message);
#endif

347
aclk/aclk_util.c Normal file
View File

@@ -0,0 +1,347 @@
#include "aclk_util.h"
#include <stdio.h>
#include "../daemon/common.h"
// CentOS 7 has older version that doesn't define this
// same goes for MacOS
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
#endif
#ifdef ACLK_LOG_CONVERSATION_DIR
volatile int aclk_conversation_log_counter = 0;
#if !defined(HAVE_C___ATOMIC) || defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
netdata_mutex_t aclk_conversation_log_mutex = NETDATA_MUTEX_INITIALIZER;
int aclk_get_conv_log_next()
{
int ret;
netdata_mutex_lock(&aclk_conversation_log_mutex);
ret = aclk_conversation_log_counter++;
netdata_mutex_unlock(&aclk_conversation_log_mutex);
return ret;
}
#endif
#endif
#define ACLK_TOPIC_PREFIX "/agent/"
struct aclk_topic {
const char *topic_suffix;
char *topic;
};
// This helps to cache finalized topics (assembled with claim_id)
// to not have to alloc or create buffer and construct topic every
// time message is sent as in old ACLK
static struct aclk_topic aclk_topic_cache[] = {
{ .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_CHART
{ .topic_suffix = "outbound/alarms", .topic = NULL }, // ACLK_TOPICID_ALARMS
{ .topic_suffix = "outbound/meta", .topic = NULL }, // ACLK_TOPICID_METADATA
{ .topic_suffix = "inbound/cmd", .topic = NULL }, // ACLK_TOPICID_COMMAND
{ .topic_suffix = NULL, .topic = NULL }
};
void free_topic_cache(void)
{
struct aclk_topic *tc = aclk_topic_cache;
while (tc->topic_suffix) {
if (tc->topic) {
freez(tc->topic);
tc->topic = NULL;
}
tc++;
}
}
static inline void generate_topic_cache(void)
{
struct aclk_topic *tc = aclk_topic_cache;
char *ptr;
if (unlikely(!tc->topic)) {
rrdhost_aclk_state_lock(localhost);
while(tc->topic_suffix) {
tc->topic = mallocz(strlen(ACLK_TOPIC_PREFIX) + (UUID_STR_LEN - 1) + 2 /* '/' and \0 */ + strlen(tc->topic_suffix));
ptr = tc->topic;
strcpy(ptr, ACLK_TOPIC_PREFIX);
ptr += strlen(ACLK_TOPIC_PREFIX);
strcpy(ptr, localhost->aclk_state.claimed_id);
ptr += (UUID_STR_LEN - 1);
*ptr++ = '/';
strcpy(ptr, tc->topic_suffix);
tc++;
}
rrdhost_aclk_state_unlock(localhost);
}
}
/*
* Build a topic based on sub_topic and final_topic
* if the sub topic starts with / assume that is an absolute topic
*
*/
const char *aclk_get_topic(enum aclk_topics topic)
{
generate_topic_cache();
return aclk_topic_cache[topic].topic;
}
int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
{
int pos = 0;
if (!strncmp("https://", url, 8)) {
pos = 8;
} else if (!strncmp("http://", url, 7)) {
error("Cannot connect ACLK over %s -> unencrypted link is not supported", url);
return 1;
}
int host_end = pos;
while (url[host_end] != 0 && url[host_end] != '/' && url[host_end] != ':')
host_end++;
if (url[host_end] == 0) {
*aclk_hostname = strdupz(url + pos);
*aclk_port = 443;
info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
return 0;
}
if (url[host_end] == ':') {
*aclk_hostname = callocz(host_end - pos + 1, 1);
strncpy(*aclk_hostname, url + pos, host_end - pos);
int port_end = host_end + 1;
while (url[port_end] >= '0' && url[port_end] <= '9')
port_end++;
if (port_end - host_end > 6) {
error("Port specified in %s is invalid", url);
freez(*aclk_hostname);
*aclk_hostname = NULL;
return 1;
}
*aclk_port = atoi(&url[host_end+1]);
}
if (url[host_end] == '/') {
*aclk_port = 443;
*aclk_hostname = callocz(1, host_end - pos + 1);
strncpy(*aclk_hostname, url+pos, host_end - pos);
}
info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
return 0;
}
/*
* TBEB with randomness
*
* @param mode 0 - to reset the delay,
* 1 - to advance a step and calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
* @returns delay in ms
*
*/
#define ACLK_MAX_BACKOFF_DELAY 1024
unsigned long int aclk_reconnect_delay(int mode)
{
static int fail = -1;
unsigned long int delay;
if (!mode || fail == -1) {
srandom(time(NULL));
fail = mode - 1;
return 0;
}
delay = (1 << fail);
if (delay >= ACLK_MAX_BACKOFF_DELAY) {
delay = ACLK_MAX_BACKOFF_DELAY * 1000;
} else {
fail++;
delay *= 1000;
delay += (random() % (MAX(1000, delay/2)));
}
return delay;
}
#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
#define ACLK_PROXY_ENV "env"
#define ACLK_PROXY_CONFIG_VAR "proxy"
struct {
ACLK_PROXY_TYPE type;
const char *url_str;
} supported_proxy_types[] = {
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
};
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type)
{
switch (*type) {
case PROXY_DISABLED:
return "disabled";
case PROXY_TYPE_HTTP:
return "HTTP";
case PROXY_TYPE_SOCKS5:
return "SOCKS";
default:
return "Unknown";
}
}
static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
{
int i = 0;
while (supported_proxy_types[i].url_str) {
if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str)))
return supported_proxy_types[i].type;
i++;
}
return PROXY_TYPE_UNKNOWN;
}
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string)
{
if (!string)
return PROXY_TYPE_UNKNOWN;
while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove)
string++;
if (!*string)
return PROXY_TYPE_UNKNOWN;
return aclk_find_proxy(string);
}
// helper function to censor user&password
// for logging purposes
void safe_log_proxy_censor(char *proxy)
{
size_t length = strlen(proxy);
char *auth = proxy + length - 1;
char *cur;
while ((auth >= proxy) && (*auth != '@'))
auth--;
//if not found or @ is first char do nothing
if (auth <= proxy)
return;
cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
if (!cur)
cur = proxy;
else
cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
while (cur < auth) {
*cur = 'X';
cur++;
}
}
static inline void safe_log_proxy_error(char *str, const char *proxy)
{
char *log = strdupz(proxy);
safe_log_proxy_censor(log);
error("%s Provided Value:\"%s\"", str, log);
freez(log);
}
static inline int check_socks_enviroment(const char **proxy)
{
char *tmp = getenv("socks_proxy");
if (!tmp)
return 1;
if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) {
*proxy = tmp;
return 0;
}
safe_log_proxy_error(
"Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
tmp);
return 1;
}
static inline int check_http_enviroment(const char **proxy)
{
char *tmp = getenv("http_proxy");
if (!tmp)
return 1;
if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) {
*proxy = tmp;
return 0;
}
safe_log_proxy_error(
"Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".",
tmp);
return 1;
}
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
{
const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
*type = PROXY_DISABLED;
if (strcmp(proxy, "none") == 0)
return proxy;
if (strcmp(proxy, ACLK_PROXY_ENV) == 0) {
if (check_socks_enviroment(&proxy) == 0) {
#ifdef LWS_WITH_SOCKS5
*type = PROXY_TYPE_SOCKS5;
return proxy;
#else
safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy "
"but Libwebsockets used doesn't have SOCKS5 support built in. "
"Ignoring and checking for other options.",
proxy);
#endif
}
if (check_http_enviroment(&proxy) == 0)
*type = PROXY_TYPE_HTTP;
return proxy;
}
*type = aclk_verify_proxy(proxy);
#ifndef LWS_WITH_SOCKS5
if (*type == PROXY_TYPE_SOCKS5) {
safe_log_proxy_error(
"Config var \"" ACLK_PROXY_CONFIG_VAR
"\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.",
proxy);
}
#endif
if (*type == PROXY_TYPE_UNKNOWN) {
*type = PROXY_DISABLED;
safe_log_proxy_error(
"Config var \"" ACLK_PROXY_CONFIG_VAR
"\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
proxy);
}
return proxy;
}
// helper function to read settings only once (static)
// as claiming, challenge/response and ACLK
// read the same thing, no need to parse again
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
{
static const char *proxy = NULL;
static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET;
if (proxy_type == PROXY_NOT_SET)
proxy = aclk_lws_wss_get_proxy_setting(&proxy_type);
*type = proxy_type;
return proxy;
}

52
aclk/aclk_util.h Normal file
View File

@@ -0,0 +1,52 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ACLK_UTIL_H
#define ACLK_UTIL_H
#include "libnetdata/libnetdata.h"
// Helper stuff which should not have any further inside ACLK dependency
// and are supposed not to be needed outside of ACLK
int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
enum aclk_topics {
ACLK_TOPICID_CHART = 0,
ACLK_TOPICID_ALARMS = 1,
ACLK_TOPICID_METADATA = 2,
ACLK_TOPICID_COMMAND = 3,
};
const char *aclk_get_topic(enum aclk_topics topic);
void free_topic_cache(void);
// TODO
// aclk_topics_reload //when claim id changes
#ifdef ACLK_LOG_CONVERSATION_DIR
extern volatile int aclk_conversation_log_counter;
#if defined(HAVE_C___ATOMIC) && !defined(NETDATA_NO_ATOMIC_INSTRUCTIONS)
#define ACLK_GET_CONV_LOG_NEXT() __atomic_fetch_add(&aclk_conversation_log_counter, 1, __ATOMIC_SEQ_CST)
#else
extern netdata_mutex_t aclk_conversation_log_mutex;
int aclk_get_conv_log_next();
#define ACLK_GET_CONV_LOG_NEXT() aclk_get_conv_log_next()
#endif
#endif
unsigned long int aclk_reconnect_delay(int mode);
typedef enum aclk_proxy_type {
PROXY_TYPE_UNKNOWN = 0,
PROXY_TYPE_SOCKS5,
PROXY_TYPE_HTTP,
PROXY_DISABLED,
PROXY_NOT_SET,
} ACLK_PROXY_TYPE;
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
void safe_log_proxy_censor(char *proxy);
int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
#endif /* ACLK_UTIL_H */

View File

@@ -3,7 +3,11 @@
#define ACLK_LWS_HTTPS_CLIENT_INTERNAL
#include "aclk_lws_https_client.h"
#ifndef ACLK_NG
#include "aclk_common.h"
#else
#include "../aclk.h"
#endif
#include "aclk_lws_wss_client.h"

View File

@@ -10,7 +10,7 @@
-e 's#[@]registrydir_POST@#$(registrydir)#g' \
-e 's#[@]varlibdir_POST@#$(varlibdir)#g' \
-e 's#[@]webdir_POST@#$(webdir)#g' \
-e 's#[@]can_enable_aclk_POST@#$(can_enable_aclk)#g' \
-e 's#[@]enable_aclk_POST@#$(enable_aclk)#g' \
-e 's#[@]enable_cloud_POST@#$(enable_cloud)#g' \
$< > $@.tmp; then \
mv "$@.tmp" "$@"; \

View File

@@ -2,7 +2,11 @@
#include "claim.h"
#include "../registry/registry_internals.h"
#ifndef ACLK_NG
#include "../aclk/legacy/aclk_common.h"
#else
#include "../aclk/aclk.h"
#endif
char *claiming_pending_arguments = NULL;

View File

@@ -123,7 +123,7 @@ if [ "@enable_cloud_POST@" = "no" ]; then
exit 3
fi
# shellcheck disable=SC2050
if [ "@can_enable_aclk_POST@" != "yes" ]; then
if [ "@enable_aclk_POST@" != "yes" ]; then
echo >&2 "This agent was built without the dependencies for Cloud and cannot be claimed"
exit 3
fi

View File

@@ -197,6 +197,14 @@ AC_ARG_ENABLE(
[ enable_cloud="detect" ]
)
AC_ARG_WITH(
[aclk-ng],
[AS_HELP_STRING([--with-aclk-ng],
[Requires ACLK-NG to be used even in case ACLK Legacy can run on this system])],
[aclk_ng="$withval"],
[aclk_ng="fallback"]
)
if test "${enable_cloud}" = "no"; then
AC_DEFINE([DISABLE_CLOUD], [1], [disable netdata cloud functionality])
fi
@@ -632,7 +640,7 @@ AM_CONDITIONAL([ENABLE_CAPABILITY], [test "${with_libcap}" = "yes"])
AC_MSG_CHECKING([if cloud functionality should be enabled])
AC_MSG_RESULT([${enable_cloud}])
if test "$enable_cloud" != "no"; then
if test "$enable_cloud" != "no" -a "$aclk_ng" != "yes"; then
# just to have all messages that can fail ACLK build in one place
# so it is easier to see why it can't be built
if test -n "${SSL_LIBS}"; then
@@ -706,6 +714,7 @@ if test "$enable_cloud" != "no"; then
fi
AC_MSG_RESULT([${can_enable_aclk}])
# TODO fix this (you need to try fallback)
test "${enable_cloud}" = "yes" -a "${can_enable_aclk}" = "no" && \
AC_MSG_ERROR([User required agent-cloud-link but it can't be built!])
@@ -715,7 +724,6 @@ if test "$enable_cloud" != "no"; then
else
enable_aclk=$enable_cloud
fi
AC_SUBST([can_enable_aclk])
if test "${enable_aclk}" = "yes"; then
AC_DEFINE([ENABLE_ACLK], [1], [netdata ACLK])
@@ -723,7 +731,35 @@ if test "$enable_cloud" != "no"; then
AC_MSG_RESULT([${enable_aclk}])
fi
if test "$enable_cloud" = "no" -a "$aclk_ng" = "yes"; then
AC_MSG_ERROR([--disable-cloud && --aclk-ng not allowed together (such configuration is self contradicting)])
fi
if test "$enable_cloud" != "no" -a "$aclk_ng" != "no"; then
AC_MSG_CHECKING([if JSON-C available for ACLK Next Generation])
if test "$enable_jsonc" != "yes"; then
AC_MSG_RESULT([no])
else
AC_MSG_RESULT([yes])
if test "$aclk_ng" != "yes" -a "$enable_aclk" == "no"; then #default "fallback"
AC_MSG_NOTICE([ACLK Legacy could not be built. Trying ACLK-NG as fallback.])
aclk_ng="yes"
fi
if test "$aclk_ng" = "yes"; then
#TODO Check OpenSSL and JSON-C
AC_MSG_CHECKING([if ACLK Next Generation can be built])
AC_DEFINE([ACLK_NG], [1], [ACLK Next Generation Should be used])
AC_DEFINE([ENABLE_ACLK], [1], [netdata ACLK])
enable_aclk="yes"
AC_MSG_RESULT([yes])
OPTIONAL_MQTT_WSS_CFLAGS="-Imqtt_websockets/src/include"
fi
fi
fi
AC_SUBST([enable_cloud])
AC_SUBST([enable_aclk])
AM_CONDITIONAL([ACLK_NG], [test "${aclk_ng}" = "yes"])
AM_CONDITIONAL([ENABLE_ACLK], [test "${enable_aclk}" = "yes"])
# -----------------------------------------------------------------------------
@@ -1420,7 +1456,8 @@ AC_SUBST([webdir])
CFLAGS="${CFLAGS} ${OPTIONAL_MATH_CFLAGS} ${OPTIONAL_NFACCT_CFLAGS} ${OPTIONAL_ZLIB_CFLAGS} ${OPTIONAL_UUID_CFLAGS} \
${OPTIONAL_LIBCAP_CFLAGS} ${OPTIONAL_IPMIMONITORING_CFLAGS} ${OPTIONAL_CUPS_CFLAGS} ${OPTIONAL_XENSTAT_FLAGS} \
${OPTIONAL_KINESIS_CFLAGS} ${OPTIONAL_PUBSUB_CFLAGS} ${OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS} \
${OPTIONAL_MONGOC_CFLAGS} ${LWS_CFLAGS} ${OPTIONAL_JSONC_STATIC_CFLAGS} ${OPTIONAL_BPF_CFLAGS} ${OPTIONAL_JUDY_CFLAGS}"
${OPTIONAL_MONGOC_CFLAGS} ${LWS_CFLAGS} ${OPTIONAL_JSONC_STATIC_CFLAGS} ${OPTIONAL_BPF_CFLAGS} ${OPTIONAL_JUDY_CFLAGS} \
${OPTIONAL_MQTT_WSS_CFLAGS}"
CXXFLAGS="${CFLAGS} ${CXX11FLAG}"
@@ -1470,6 +1507,7 @@ AC_SUBST([OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS])
AC_SUBST([OPTIONAL_MONGOC_CFLAGS])
AC_SUBST([OPTIONAL_MONGOC_LIBS])
AC_SUBST([OPTIONAL_LWS_LIBS])
AC_SUBST([OPTIONAL_MQTT_WSS_CFLAGS])
# -----------------------------------------------------------------------------
# Check if cmocka is available - needed for unit testing
@@ -1615,6 +1653,7 @@ AC_CONFIG_FILES([
spawn/Makefile
parser/Makefile
])
AC_OUTPUT
test "${with_math}" != "yes" && AC_MSG_WARN([You are building without math. math allows accurate calculations. It should be enabled.]) || :

View File

@@ -8,7 +8,13 @@
#ifdef ENABLE_ACLK
#define FEAT_CLOUD 1
#define FEAT_CLOUD_MSG ""
#ifdef ACLK_NG
#define ACLK_IMPL "Next Generation"
#else
#define ACLK_IMPL "Legacy"
#endif
#else
#define ACLK_IMPL ""
#ifdef DISABLE_CLOUD
#define FEAT_CLOUD 0
#define FEAT_CLOUD_MSG "(by user request)"
@@ -62,6 +68,7 @@
#define FEAT_LIBCAP 0
#endif
#ifndef ACLK_NG
#ifdef ACLK_NO_LIBMOSQ
#define FEAT_MOSQUITTO 0
#else
@@ -83,6 +90,7 @@
#define FEAT_LWS_MSG "shared-lib"
#endif
#endif
#endif /* ACLK_NG */
#ifdef NETDATA_WITH_ZLIB
#define FEAT_ZLIB 1
@@ -199,6 +207,9 @@ void print_build_info(void) {
printf(" dbengine: %s\n", FEAT_YES_NO(FEAT_DBENGINE));
printf(" Native HTTPS: %s\n", FEAT_YES_NO(FEAT_NATIVE_HTTPS));
printf(" Netdata Cloud: %s %s\n", FEAT_YES_NO(FEAT_CLOUD), FEAT_CLOUD_MSG);
#if FEAT_CLOUD == 1
printf(" Cloud Implementation: %s\n", ACLK_IMPL);
#endif
printf(" TLS Host Verification: %s\n", FEAT_YES_NO(FEAT_TLS_HOST_VERIFY));
printf("Libraries:\n");
@@ -207,12 +218,14 @@ void print_build_info(void) {
printf(" libcap: %s\n", FEAT_YES_NO(FEAT_LIBCAP));
printf(" libcrypto: %s\n", FEAT_YES_NO(FEAT_CRYPTO));
printf(" libm: %s\n", FEAT_YES_NO(FEAT_LIBM));
#ifndef ACLK_NG
#if defined(ENABLE_ACLK)
printf(" LWS: %s %s v%d.%d.%d\n", FEAT_YES_NO(FEAT_LWS), FEAT_LWS_MSG, LWS_LIBRARY_VERSION_MAJOR, LWS_LIBRARY_VERSION_MINOR, LWS_LIBRARY_VERSION_PATCH);
#else
printf(" LWS: %s %s\n", FEAT_YES_NO(FEAT_LWS), FEAT_LWS_MSG);
#endif
printf(" mosquitto: %s\n", FEAT_YES_NO(FEAT_MOSQUITTO));
#endif
printf(" tcalloc: %s\n", FEAT_YES_NO(FEAT_TCMALLOC));
printf(" zlib: %s\n", FEAT_YES_NO(FEAT_ZLIB));
@@ -251,6 +264,9 @@ void print_build_info_json(void) {
printf(" \"cloud-disabled\": true,\n");
#else
printf(" \"cloud-disabled\": false,\n");
#endif
#if FEAT_CLOUD == 1
printf(" \"cloud-implementation\": \"%s\",\n", ACLK_IMPL);
#endif
printf(" \"tls-host-verify\": %s\n", FEAT_JSON_BOOL(FEAT_TLS_HOST_VERIFY));
printf(" },\n");
@@ -261,6 +277,7 @@ void print_build_info_json(void) {
printf(" \"libcap\": %s,\n", FEAT_JSON_BOOL(FEAT_LIBCAP));
printf(" \"libcrypto\": %s,\n", FEAT_JSON_BOOL(FEAT_CRYPTO));
printf(" \"libm\": %s,\n", FEAT_JSON_BOOL(FEAT_LIBM));
#ifndef ACLK_NG
#if defined(ENABLE_ACLK)
printf(" \"lws\": %s,\n", FEAT_JSON_BOOL(FEAT_LWS));
printf(" \"lws-version\": \"%d.%d.%d\",\n", LWS_LIBRARY_VERSION_MAJOR, LWS_LIBRARY_VERSION_MINOR, LWS_LIBRARY_VERSION_PATCH);
@@ -269,6 +286,7 @@ void print_build_info_json(void) {
printf(" \"lws\": %s,\n", FEAT_JSON_BOOL(FEAT_LWS));
#endif
printf(" \"mosquitto\": %s,\n", FEAT_JSON_BOOL(FEAT_MOSQUITTO));
#endif
printf(" \"tcmalloc\": %s,\n", FEAT_JSON_BOOL(FEAT_TCMALLOC));
printf(" \"zlib\": %s\n", FEAT_JSON_BOOL(FEAT_ZLIB));
printf(" },\n");

View File

@@ -66,7 +66,11 @@
#include "claim/claim.h"
// netdata agent cloud link
#ifndef ACLK_NG
#include "aclk/legacy/agent_cloud_link.h"
#else
#include "aclk/aclk.h"
#endif
// global GUID map functions

View File

@@ -80,7 +80,7 @@ struct netdata_static_thread static_threads[] = {
NETDATA_PLUGIN_HOOK_IDLEJITTER
NETDATA_PLUGIN_HOOK_STATSD
#ifdef ENABLE_ACLK
#if defined(ENABLE_ACLK) || defined(ACLK_NG)
NETDATA_ACLK_HOOK
#endif

View File

@@ -34,7 +34,12 @@ struct pg_cache_page_index;
#include "rrdcalc.h"
#include "rrdcalctemplate.h"
#include "../streaming/rrdpush.h"
#ifndef ACLK_NG
#include "../aclk/legacy/aclk_rrdhost_state.h"
#else
#include "aclk/aclk.h"
#endif
struct context_param {
RRDDIM *rd;

View File

@@ -2,7 +2,7 @@
#include "health.h"
static inline void health_string2json(BUFFER *wb, const char *prefix, const char *label, const char *value, const char *suffix) {
void health_string2json(BUFFER *wb, const char *prefix, const char *label, const char *value, const char *suffix) {
if(value && *value) {
buffer_sprintf(wb, "%s\"%s\":\"", prefix, label);
buffer_strcat_htmlescape(wb, value);
@@ -13,7 +13,7 @@ static inline void health_string2json(BUFFER *wb, const char *prefix, const char
buffer_sprintf(wb, "%s\"%s\":null%s", prefix, label, suffix);
}
inline void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host) {
void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host) {
buffer_sprintf(wb,
"\n\t{\n"
"\t\t\"hostname\": \"%s\",\n"

1
mqtt_websockets Submodule

Submodule mqtt_websockets added at 62a4af9cc8

View File

@@ -319,6 +319,10 @@ while [ -n "${1}" ]; do
"--disable-go") NETDATA_DISABLE_GO=1 ;;
"--enable-ebpf") NETDATA_DISABLE_EBPF=0 ;;
"--disable-ebpf") NETDATA_DISABLE_EBPF=1 NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--disable-ebpf/} --disable-ebpf" ;;
"--aclk-ng")
NETDATA_ACLK_NG=1
NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--with-aclk-ng/} --with-aclk-ng"
;;
"--disable-cloud")
if [ -n "${NETDATA_REQUIRE_CLOUD}" ]; then
echo "Cloud explicitly enabled, ignoring --disable-cloud."
@@ -567,8 +571,8 @@ copy_libmosquitto() {
}
bundle_libmosquitto() {
if [ -n "${NETDATA_DISABLE_CLOUD}" ]; then
echo "Skipping cloud"
if [ -n "${NETDATA_DISABLE_CLOUD}" ] || [ -n "${NETDATA_ACLK_NG}" ]; then
echo "Skipping libmosquitto"
return 0
fi
@@ -664,7 +668,7 @@ copy_libwebsockets() {
}
bundle_libwebsockets() {
if [ -n "${NETDATA_DISABLE_CLOUD}" ] || [ -n "${USE_SYSTEM_LWS}" ]; then
if [ -n "${NETDATA_DISABLE_CLOUD}" ] || [ -n "${USE_SYSTEM_LWS}" ] || [ -n "${NETDATA_ACLK_NG}" ]; then
return 0
fi

View File

@@ -440,7 +440,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
cd.version = rpt->stream_version;
#ifdef ENABLE_ACLK
#if defined(ENABLE_ACLK) && !defined(ACLK_NG)
// in case we have cloud connection we inform cloud
// new slave connected
if (netdata_cloud_setting)
@@ -454,7 +454,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
rpt->client_port, count);
#ifdef ENABLE_ACLK
#if defined(ENABLE_ACLK) && !defined(ACLK_NG)
// in case we have cloud connection we inform cloud
// new slave connected
if (netdata_cloud_setting)

View File

@@ -961,6 +961,11 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb)
#ifdef ENABLE_ACLK
buffer_strcat(wb, "\t\"cloud-available\": true,\n");
#ifdef ACLK_NG
buffer_strcat(wb, "\t\"aclk-implementation\": \"Next Generation\",\n");
#else
buffer_strcat(wb, "\t\"aclk-implementation\": \"legacy\",\n");
#endif
#else
buffer_strcat(wb, "\t\"cloud-available\": false,\n");
#endif