mirror of
https://github.com/netdata/netdata.git
synced 2021-06-06 23:03:21 +03:00
Improve the impact of health code on netdata scalability (#8407)
* Add support for spawning processes without pipes. * Port health_alarm_execute() from mypopen() to netdata_spawn() * Make alarm notifications asynchronous within a single health thread iteration * Initial version of spawn server. * preliminary integration of spawn client with health
This commit is contained in:
committed by
GitHub
parent
a606a27f16
commit
6393b2f535
@@ -641,6 +641,13 @@ set(ACLK_PLUGIN_FILES
|
||||
aclk/mqtt.h
|
||||
)
|
||||
|
||||
set(SPAWN_PLUGIN_FILES
|
||||
spawn/spawn.c
|
||||
spawn/spawn_server.c
|
||||
spawn/spawn_client.c
|
||||
spawn/spawn.h
|
||||
)
|
||||
|
||||
set(ACLK_STATIC_LIBS
|
||||
${CMAKE_SOURCE_DIR}/externaldeps/mosquitto/libmosquitto.a
|
||||
${CMAKE_SOURCE_DIR}/externaldeps/libwebsockets/libwebsockets.a
|
||||
@@ -741,6 +748,7 @@ set(NETDATA_FILES
|
||||
${STREAMING_PLUGIN_FILES}
|
||||
${WEB_PLUGIN_FILES}
|
||||
${CLAIM_PLUGIN_FILES}
|
||||
${SPAWN_PLUGIN_FILES}
|
||||
)
|
||||
|
||||
set(NETDATACLI_FILES
|
||||
|
||||
@@ -112,6 +112,7 @@ SUBDIRS += \
|
||||
web \
|
||||
claim \
|
||||
aclk \
|
||||
spawn \
|
||||
$(NULL)
|
||||
|
||||
|
||||
@@ -497,6 +498,13 @@ endif
|
||||
|
||||
|
||||
|
||||
SPAWN_PLUGIN_FILES = \
|
||||
spawn/spawn.c \
|
||||
spawn/spawn_server.c \
|
||||
spawn/spawn_client.c \
|
||||
spawn/spawn.h \
|
||||
$(NULL)
|
||||
|
||||
EXPORTING_ENGINE_FILES = \
|
||||
exporting/exporting_engine.c \
|
||||
exporting/exporting_engine.h \
|
||||
@@ -595,6 +603,7 @@ NETDATA_FILES = \
|
||||
$(WEB_PLUGIN_FILES) \
|
||||
$(CLAIM_FILES) \
|
||||
$(ACLK_FILES) \
|
||||
$(SPAWN_PLUGIN_FILES) \
|
||||
$(NULL)
|
||||
|
||||
if FREEBSD
|
||||
|
||||
@@ -1490,6 +1490,7 @@ AC_CONFIG_FILES([
|
||||
web/server/static/Makefile
|
||||
claim/Makefile
|
||||
aclk/Makefile
|
||||
spawn/Makefile
|
||||
])
|
||||
AC_OUTPUT
|
||||
|
||||
|
||||
@@ -68,6 +68,9 @@
|
||||
// netdata agent cloud link
|
||||
#include "aclk/agent_cloud_link.h"
|
||||
|
||||
// netdata agent spawn server
|
||||
#include "spawn/spawn.h"
|
||||
|
||||
// the netdata deamon
|
||||
#include "daemon.h"
|
||||
#include "main.h"
|
||||
|
||||
@@ -906,6 +906,11 @@ int main(int argc, char **argv) {
|
||||
else i++;
|
||||
}
|
||||
}
|
||||
if (argc > 1 && strcmp(argv[1], SPAWN_SERVER_COMMAND_LINE_ARGUMENT) == 0) {
|
||||
// don't run netdata, this is the spawn server
|
||||
spawn_server();
|
||||
exit(0);
|
||||
}
|
||||
|
||||
// parse options
|
||||
{
|
||||
@@ -1377,6 +1382,9 @@ int main(int argc, char **argv) {
|
||||
|
||||
netdata_threads_init_after_fork((size_t)config_get_number(CONFIG_SECTION_GLOBAL, "pthread stack size", (long)default_stacksize));
|
||||
|
||||
// fork the spawn server
|
||||
spawn_init();
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// initialize rrd, registry, health, rrdpush, etc.
|
||||
|
||||
|
||||
@@ -576,6 +576,7 @@ struct alarm_entry {
|
||||
char *recipient;
|
||||
time_t exec_run_timestamp;
|
||||
int exec_code;
|
||||
uint64_t exec_spawn_serial;
|
||||
|
||||
char *source;
|
||||
char *units;
|
||||
@@ -601,6 +602,8 @@ struct alarm_entry {
|
||||
time_t last_repeat;
|
||||
|
||||
struct alarm_entry *next;
|
||||
struct alarm_entry *next_in_progress;
|
||||
struct alarm_entry *prev_in_progress;
|
||||
};
|
||||
|
||||
|
||||
|
||||
@@ -11,6 +11,46 @@ struct health_cmdapi_thread_status {
|
||||
unsigned int default_health_enabled = 1;
|
||||
char *silencers_filename;
|
||||
|
||||
// the queue of executed alarm notifications that haven't been waited for yet
|
||||
static struct {
|
||||
ALARM_ENTRY *head; // oldest
|
||||
ALARM_ENTRY *tail; // latest
|
||||
} alarm_notifications_in_progress = {NULL, NULL};
|
||||
|
||||
static inline void enqueue_alarm_notify_in_progress(ALARM_ENTRY *ae)
|
||||
{
|
||||
ae->prev_in_progress = NULL;
|
||||
ae->next_in_progress = NULL;
|
||||
|
||||
if (NULL != alarm_notifications_in_progress.tail) {
|
||||
ae->prev_in_progress = alarm_notifications_in_progress.tail;
|
||||
alarm_notifications_in_progress.tail->next_in_progress = ae;
|
||||
}
|
||||
if (NULL == alarm_notifications_in_progress.head) {
|
||||
alarm_notifications_in_progress.head = ae;
|
||||
}
|
||||
alarm_notifications_in_progress.tail = ae;
|
||||
|
||||
}
|
||||
|
||||
static inline void unlink_alarm_notify_in_progress(ALARM_ENTRY *ae)
|
||||
{
|
||||
struct alarm_entry *prev = ae->prev_in_progress;
|
||||
struct alarm_entry *next = ae->next_in_progress;
|
||||
|
||||
if (NULL != prev) {
|
||||
prev->next_in_progress = next;
|
||||
}
|
||||
if (NULL != next) {
|
||||
next->prev_in_progress = prev;
|
||||
}
|
||||
if (ae == alarm_notifications_in_progress.head) {
|
||||
alarm_notifications_in_progress.head = next;
|
||||
}
|
||||
if (ae == alarm_notifications_in_progress.tail) {
|
||||
alarm_notifications_in_progress.tail = prev;
|
||||
}
|
||||
}
|
||||
// ----------------------------------------------------------------------------
|
||||
// health initialization
|
||||
|
||||
@@ -265,7 +305,6 @@ static inline void health_alarm_execute(RRDHOST *host, ALARM_ENTRY *ae) {
|
||||
}
|
||||
|
||||
static char command_to_run[ALARM_EXEC_COMMAND_LENGTH + 1];
|
||||
pid_t command_pid;
|
||||
|
||||
const char *exec = (ae->exec) ? ae->exec : host->health_default_exec;
|
||||
const char *recipient = (ae->recipient) ? ae->recipient : host->health_default_recipient;
|
||||
@@ -321,25 +360,30 @@ static inline void health_alarm_execute(RRDHOST *host, ALARM_ENTRY *ae) {
|
||||
);
|
||||
|
||||
ae->flags |= HEALTH_ENTRY_FLAG_EXEC_RUN;
|
||||
ae->exec_run_timestamp = now_realtime_sec();
|
||||
ae->exec_run_timestamp = now_realtime_sec(); /* will be updated by real time after spawning */
|
||||
|
||||
debug(D_HEALTH, "executing command '%s'", command_to_run);
|
||||
FILE *fp = mypopen(command_to_run, &command_pid);
|
||||
if(!fp) {
|
||||
error("HEALTH: Cannot popen(\"%s\", \"r\").", command_to_run);
|
||||
goto done;
|
||||
}
|
||||
debug(D_HEALTH, "HEALTH reading from command (discarding command's output)");
|
||||
char buffer[100 + 1];
|
||||
while(fgets(buffer, 100, fp) != NULL) ;
|
||||
ae->exec_code = mypclose(fp, command_pid);
|
||||
ae->flags |= HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS;
|
||||
ae->exec_spawn_serial = spawn_enq_cmd(command_to_run);
|
||||
enqueue_alarm_notify_in_progress(ae);
|
||||
|
||||
return; //health_alarm_wait_for_execution
|
||||
done:
|
||||
health_alarm_log_save(host, ae);
|
||||
}
|
||||
|
||||
static inline void health_alarm_wait_for_execution(ALARM_ENTRY *ae) {
|
||||
if (!(ae->flags & HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS))
|
||||
return;
|
||||
|
||||
spawn_wait_cmd(ae->exec_spawn_serial, &ae->exec_code, &ae->exec_run_timestamp);
|
||||
debug(D_HEALTH, "done executing command - returned with code %d", ae->exec_code);
|
||||
ae->flags &= ~HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS;
|
||||
|
||||
if(ae->exec_code != 0)
|
||||
ae->flags |= HEALTH_ENTRY_FLAG_EXEC_FAILED;
|
||||
|
||||
done:
|
||||
health_alarm_log_save(host, ae);
|
||||
unlink_alarm_notify_in_progress(ae);
|
||||
}
|
||||
|
||||
static inline void health_process_notifications(RRDHOST *host, ALARM_ENTRY *ae) {
|
||||
@@ -401,6 +445,7 @@ static inline void health_alarm_log_process(RRDHOST *host) {
|
||||
ALARM_ENTRY *t = ae->next;
|
||||
|
||||
if(likely(!alarm_entry_isrepeating(host, ae))) {
|
||||
health_alarm_wait_for_execution(ae);
|
||||
health_alarm_log_free_one_nochecks_nounlink(ae);
|
||||
host->health_log.count--;
|
||||
}
|
||||
@@ -945,6 +990,7 @@ void *health_main(void *ptr) {
|
||||
rc->rrdcalc_flags |= RRDCALC_FLAG_RUN_ONCE;
|
||||
health_process_notifications(host, ae);
|
||||
debug(D_HEALTH, "Notification sent for the repeating alarm %u.", ae->alarm_id);
|
||||
health_alarm_wait_for_execution(ae);
|
||||
health_alarm_log_free_one_nochecks_nounlink(ae);
|
||||
}
|
||||
}
|
||||
@@ -959,11 +1005,23 @@ void *health_main(void *ptr) {
|
||||
// and cleanup
|
||||
health_alarm_log_process(host);
|
||||
|
||||
if (unlikely(netdata_exit))
|
||||
if (unlikely(netdata_exit)) {
|
||||
// wait for all notifications to finish before allowing health to be cleaned up
|
||||
ALARM_ENTRY *ae;
|
||||
while (NULL != (ae = alarm_notifications_in_progress.head)) {
|
||||
health_alarm_wait_for_execution(ae);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
} /* rrdhost_foreach */
|
||||
|
||||
// wait for all notifications to finish before allowing health to be cleaned up
|
||||
ALARM_ENTRY *ae;
|
||||
while (NULL != (ae = alarm_notifications_in_progress.head)) {
|
||||
health_alarm_wait_for_execution(ae);
|
||||
}
|
||||
|
||||
rrd_unlock();
|
||||
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ extern unsigned int default_health_enabled;
|
||||
#define HEALTH_ENTRY_FLAG_EXEC_FAILED 0x00000008
|
||||
#define HEALTH_ENTRY_FLAG_SILENCED 0x00000010
|
||||
#define HEALTH_ENTRY_RUN_ONCE 0x00000020
|
||||
#define HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS 0x00000040
|
||||
|
||||
#define HEALTH_ENTRY_FLAG_SAVED 0x10000000
|
||||
#define HEALTH_ENTRY_FLAG_NO_CLEAR_NOTIFICATION 0x80000000
|
||||
|
||||
@@ -78,8 +78,16 @@ static void myp_del(pid_t pid) {
|
||||
#define PIPE_READ 0
|
||||
#define PIPE_WRITE 1
|
||||
|
||||
static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, char **env) {
|
||||
FILE *fp;
|
||||
/* custom_popene flag definitions */
|
||||
#define FLAG_CREATE_PIPE 1 // Create a pipe like popen() when set, otherwise set stdout to /dev/null
|
||||
#define FLAG_CLOSE_FD 2 // Close all file descriptors other than STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO
|
||||
|
||||
/*
|
||||
* Returns -1 on failure, 0 on success. When FLAG_CREATE_PIPE is set, on success set the FILE *fp pointer.
|
||||
*/
|
||||
static inline int custom_popene(const char *command, volatile pid_t *pidptr, char **env, uint8_t flags, FILE **fpp) {
|
||||
FILE *fp = NULL;
|
||||
int ret = 0; // success by default
|
||||
int pipefd[2], error;
|
||||
pid_t pid;
|
||||
char *const spawn_argv[] = {
|
||||
@@ -91,23 +99,36 @@ static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, c
|
||||
posix_spawnattr_t attr;
|
||||
posix_spawn_file_actions_t fa;
|
||||
|
||||
if (pipe(pipefd) == -1)
|
||||
return NULL;
|
||||
if ((fp = fdopen(pipefd[PIPE_READ], "r")) == NULL) {
|
||||
goto error_after_pipe;
|
||||
if (flags & FLAG_CREATE_PIPE) {
|
||||
if (pipe(pipefd) == -1)
|
||||
return -1;
|
||||
if ((fp = fdopen(pipefd[PIPE_READ], "r")) == NULL) {
|
||||
goto error_after_pipe;
|
||||
}
|
||||
}
|
||||
|
||||
// Mark all files to be closed by the exec() stage of posix_spawn()
|
||||
int i;
|
||||
for (i = (int) (sysconf(_SC_OPEN_MAX) - 1); i >= 0; i--)
|
||||
if(i != STDIN_FILENO && i != STDERR_FILENO)
|
||||
(void)fcntl(i, F_SETFD, FD_CLOEXEC);
|
||||
if (flags & FLAG_CLOSE_FD) {
|
||||
// Mark all files to be closed by the exec() stage of posix_spawn()
|
||||
int i;
|
||||
for (i = (int) (sysconf(_SC_OPEN_MAX) - 1); i >= 0; i--) {
|
||||
if (i != STDIN_FILENO && i != STDERR_FILENO)
|
||||
(void) fcntl(i, F_SETFD, FD_CLOEXEC);
|
||||
}
|
||||
}
|
||||
|
||||
if (!posix_spawn_file_actions_init(&fa)) {
|
||||
// move the pipe to stdout in the child
|
||||
if (posix_spawn_file_actions_adddup2(&fa, pipefd[PIPE_WRITE], STDOUT_FILENO)) {
|
||||
error("posix_spawn_file_actions_adddup2() failed");
|
||||
goto error_after_posix_spawn_file_actions_init;
|
||||
if (flags & FLAG_CREATE_PIPE) {
|
||||
// move the pipe to stdout in the child
|
||||
if (posix_spawn_file_actions_adddup2(&fa, pipefd[PIPE_WRITE], STDOUT_FILENO)) {
|
||||
error("posix_spawn_file_actions_adddup2() failed");
|
||||
goto error_after_posix_spawn_file_actions_init;
|
||||
}
|
||||
} else {
|
||||
// set stdout to /dev/null
|
||||
if (posix_spawn_file_actions_addopen(&fa, STDOUT_FILENO, "/dev/null", O_WRONLY, 0)) {
|
||||
error("posix_spawn_file_actions_addopen() failed");
|
||||
// this is not a fatal error
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error("posix_spawn_file_actions_init() failed.");
|
||||
@@ -136,10 +157,16 @@ static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, c
|
||||
} else {
|
||||
myp_add_unlock();
|
||||
error("Failed to spawn command: '%s' from parent pid %d.", command, getpid());
|
||||
fclose(fp);
|
||||
fp = NULL;
|
||||
if (flags & FLAG_CREATE_PIPE) {
|
||||
fclose(fp);
|
||||
}
|
||||
ret = -1;
|
||||
}
|
||||
if (flags & FLAG_CREATE_PIPE) {
|
||||
close(pipefd[PIPE_WRITE]);
|
||||
if (0 == ret) // on success set FILE * pointer
|
||||
*fpp = fp;
|
||||
}
|
||||
close(pipefd[PIPE_WRITE]);
|
||||
|
||||
if (!error) {
|
||||
// posix_spawnattr_init() succeeded
|
||||
@@ -149,19 +176,21 @@ static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, c
|
||||
if (posix_spawn_file_actions_destroy(&fa))
|
||||
error("posix_spawn_file_actions_destroy");
|
||||
|
||||
return fp;
|
||||
return ret;
|
||||
|
||||
error_after_posix_spawn_file_actions_init:
|
||||
if (posix_spawn_file_actions_destroy(&fa))
|
||||
error("posix_spawn_file_actions_destroy");
|
||||
error_after_pipe:
|
||||
if (fp)
|
||||
fclose(fp);
|
||||
else
|
||||
close(pipefd[PIPE_READ]);
|
||||
if (flags & FLAG_CREATE_PIPE) {
|
||||
if (fp)
|
||||
fclose(fp);
|
||||
else
|
||||
close(pipefd[PIPE_READ]);
|
||||
|
||||
close(pipefd[PIPE_WRITE]);
|
||||
return NULL;
|
||||
close(pipefd[PIPE_WRITE]);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
// See man environ
|
||||
@@ -222,26 +251,37 @@ int myp_reap(pid_t pid) {
|
||||
}
|
||||
|
||||
FILE *mypopen(const char *command, volatile pid_t *pidptr) {
|
||||
return custom_popene(command, pidptr, environ);
|
||||
FILE *fp = NULL;
|
||||
(void)custom_popene(command, pidptr, environ, FLAG_CREATE_PIPE | FLAG_CLOSE_FD, &fp);
|
||||
return fp;
|
||||
}
|
||||
|
||||
FILE *mypopene(const char *command, volatile pid_t *pidptr, char **env) {
|
||||
return custom_popene(command, pidptr, env);
|
||||
FILE *fp = NULL;
|
||||
(void)custom_popene(command, pidptr, env, FLAG_CREATE_PIPE | FLAG_CLOSE_FD, &fp);
|
||||
return fp;
|
||||
}
|
||||
|
||||
int mypclose(FILE *fp, pid_t pid) {
|
||||
// returns 0 on success, -1 on failure
|
||||
int netdata_spawn(const char *command, volatile pid_t *pidptr) {
|
||||
return custom_popene(command, pidptr, environ, 0, NULL);
|
||||
}
|
||||
|
||||
int custom_pclose(FILE *fp, pid_t pid) {
|
||||
int ret;
|
||||
siginfo_t info;
|
||||
|
||||
debug(D_EXIT, "Request to mypclose() on pid %d", pid);
|
||||
|
||||
// close the pipe fd
|
||||
// this is required in musl
|
||||
// without it the childs do not exit
|
||||
close(fileno(fp));
|
||||
if (fp) {
|
||||
// close the pipe fd
|
||||
// this is required in musl
|
||||
// without it the childs do not exit
|
||||
close(fileno(fp));
|
||||
|
||||
// close the pipe file pointer
|
||||
fclose(fp);
|
||||
// close the pipe file pointer
|
||||
fclose(fp);
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
|
||||
@@ -285,3 +325,13 @@ int mypclose(FILE *fp, pid_t pid) {
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int mypclose(FILE *fp, pid_t pid)
|
||||
{
|
||||
return custom_pclose(fp, pid);
|
||||
}
|
||||
|
||||
int netdata_spawn_waitpid(pid_t pid)
|
||||
{
|
||||
return custom_pclose(NULL, pid);
|
||||
}
|
||||
@@ -11,6 +11,8 @@
|
||||
extern FILE *mypopen(const char *command, volatile pid_t *pidptr);
|
||||
extern FILE *mypopene(const char *command, volatile pid_t *pidptr, char **env);
|
||||
extern int mypclose(FILE *fp, pid_t pid);
|
||||
extern int netdata_spawn(const char *command, volatile pid_t *pidptr);
|
||||
extern int netdata_spawn_waitpid(pid_t pid);
|
||||
extern void myp_init(void);
|
||||
extern void myp_free(void);
|
||||
extern int myp_reap(pid_t pid);
|
||||
|
||||
9
spawn/Makefile.am
Normal file
9
spawn/Makefile.am
Normal file
@@ -0,0 +1,9 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
AUTOMAKE_OPTIONS = subdir-objects
|
||||
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
|
||||
|
||||
dist_noinst_DATA = \
|
||||
README.md \
|
||||
$(NULL)
|
||||
|
||||
0
spawn/README.md
Normal file
0
spawn/README.md
Normal file
289
spawn/spawn.c
Normal file
289
spawn/spawn.c
Normal file
@@ -0,0 +1,289 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "spawn.h"
|
||||
#include "../database/engine/rrdenginelib.h"
|
||||
|
||||
static uv_thread_t thread;
|
||||
int spawn_thread_error;
|
||||
int spawn_thread_shutdown;
|
||||
|
||||
struct spawn_queue spawn_cmd_queue;
|
||||
|
||||
static struct spawn_cmd_info *create_spawn_cmd(char *command_to_run)
|
||||
{
|
||||
struct spawn_cmd_info *cmdinfo;
|
||||
|
||||
cmdinfo = mallocz(sizeof(*cmdinfo));
|
||||
assert(0 == uv_cond_init(&cmdinfo->cond));
|
||||
assert(0 == uv_mutex_init(&cmdinfo->mutex));
|
||||
cmdinfo->serial = 0; /* invalid */
|
||||
cmdinfo->command_to_run = strdupz(command_to_run);
|
||||
cmdinfo->exit_status = -1; /* invalid */
|
||||
cmdinfo->pid = -1; /* invalid */
|
||||
cmdinfo->flags = 0;
|
||||
|
||||
return cmdinfo;
|
||||
}
|
||||
|
||||
void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo)
|
||||
{
|
||||
uv_cond_destroy(&cmdinfo->cond);
|
||||
uv_mutex_destroy(&cmdinfo->mutex);
|
||||
|
||||
freez(cmdinfo->command_to_run);
|
||||
freez(cmdinfo);
|
||||
}
|
||||
|
||||
int spawn_cmd_compare(void *a, void *b)
|
||||
{
|
||||
struct spawn_cmd_info *cmda = a, *cmdb = b;
|
||||
|
||||
/* No need for mutex, serial will never change and the entries cannot be deallocated yet */
|
||||
if (cmda->serial < cmdb->serial) return -1;
|
||||
if (cmda->serial > cmdb->serial) return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void init_spawn_cmd_queue(void)
|
||||
{
|
||||
spawn_cmd_queue.cmd_tree.root = NULL;
|
||||
spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare;
|
||||
spawn_cmd_queue.size = 0;
|
||||
spawn_cmd_queue.latest_serial = 0;
|
||||
assert(0 == uv_cond_init(&spawn_cmd_queue.cond));
|
||||
assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex));
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns serial number of the enqueued command
|
||||
*/
|
||||
uint64_t spawn_enq_cmd(char *command_to_run)
|
||||
{
|
||||
unsigned queue_size;
|
||||
uint64_t serial;
|
||||
avl *avl_ret;
|
||||
struct spawn_cmd_info *cmdinfo;
|
||||
|
||||
cmdinfo = create_spawn_cmd(command_to_run);
|
||||
|
||||
/* wait for free space in queue */
|
||||
uv_mutex_lock(&spawn_cmd_queue.mutex);
|
||||
while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) {
|
||||
uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex);
|
||||
}
|
||||
assert(queue_size < SPAWN_MAX_OUTSTANDING);
|
||||
spawn_cmd_queue.size = queue_size + 1;
|
||||
|
||||
serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */
|
||||
cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */
|
||||
|
||||
/* enqueue command */
|
||||
avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl *)cmdinfo);
|
||||
assert(avl_ret == (avl *)cmdinfo);
|
||||
uv_mutex_unlock(&spawn_cmd_queue.mutex);
|
||||
|
||||
/* wake up event loop */
|
||||
assert(0 == uv_async_send(&spawn_async));
|
||||
return serial;
|
||||
}
|
||||
|
||||
/*
|
||||
* Blocks until command with serial finishes running. Only one thread is allowed to wait per command.
|
||||
*/
|
||||
void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp)
|
||||
{
|
||||
avl *avl_ret;
|
||||
struct spawn_cmd_info tmp, *cmdinfo;
|
||||
|
||||
tmp.serial = serial;
|
||||
|
||||
uv_mutex_lock(&spawn_cmd_queue.mutex);
|
||||
avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl *)&tmp);
|
||||
uv_mutex_unlock(&spawn_cmd_queue.mutex);
|
||||
|
||||
assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */
|
||||
cmdinfo = (struct spawn_cmd_info *)avl_ret;
|
||||
|
||||
uv_mutex_lock(&cmdinfo->mutex);
|
||||
while (!(cmdinfo->flags & SPAWN_CMD_DONE)) {
|
||||
/* Only 1 thread is allowed to wait for this command to finish */
|
||||
uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex);
|
||||
}
|
||||
uv_mutex_unlock(&cmdinfo->mutex);
|
||||
|
||||
spawn_deq_cmd(cmdinfo);
|
||||
*exit_status = cmdinfo->exit_status;
|
||||
*exec_run_timestamp = cmdinfo->exec_run_timestamp;
|
||||
|
||||
destroy_spawn_cmd(cmdinfo);
|
||||
}
|
||||
|
||||
void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo)
|
||||
{
|
||||
unsigned queue_size;
|
||||
avl *avl_ret;
|
||||
|
||||
uv_mutex_lock(&spawn_cmd_queue.mutex);
|
||||
queue_size = spawn_cmd_queue.size;
|
||||
assert(queue_size);
|
||||
/* dequeue command */
|
||||
avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl *)cmdinfo);
|
||||
assert(avl_ret);
|
||||
|
||||
spawn_cmd_queue.size = queue_size - 1;
|
||||
|
||||
/* wake up callers */
|
||||
uv_cond_signal(&spawn_cmd_queue.cond);
|
||||
uv_mutex_unlock(&spawn_cmd_queue.mutex);
|
||||
}
|
||||
|
||||
/*
|
||||
* Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the
|
||||
* only writer as far as struct spawn_cmd_info entries are concerned.
|
||||
*/
|
||||
static int find_unprocessed_spawn_cmd_cb(void *entry, void *data)
|
||||
{
|
||||
struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry;
|
||||
|
||||
if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) {
|
||||
*cmdinfop = cmdinfo;
|
||||
return -1; /* break tree traversal */
|
||||
}
|
||||
return 0; /* continue traversing */
|
||||
}
|
||||
|
||||
struct spawn_cmd_info *spawn_get_unprocessed_cmd(void)
|
||||
{
|
||||
struct spawn_cmd_info *cmdinfo;
|
||||
unsigned queue_size;
|
||||
int ret;
|
||||
|
||||
uv_mutex_lock(&spawn_cmd_queue.mutex);
|
||||
queue_size = spawn_cmd_queue.size;
|
||||
if (queue_size == 0) {
|
||||
uv_mutex_unlock(&spawn_cmd_queue.mutex);
|
||||
return NULL;
|
||||
}
|
||||
/* find command */
|
||||
cmdinfo = NULL;
|
||||
ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo);
|
||||
if (-1 != ret) { /* no commands available for processing */
|
||||
uv_mutex_unlock(&spawn_cmd_queue.mutex);
|
||||
return NULL;
|
||||
}
|
||||
uv_mutex_unlock(&spawn_cmd_queue.mutex);
|
||||
|
||||
return cmdinfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties.
|
||||
* The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD.
|
||||
* The caller has to be the netdata user as configured.
|
||||
*
|
||||
* @param loop the libuv loop of the caller context
|
||||
* @param spawn_channel the birectional libuv IPC pipe that the server and the caller will share
|
||||
* @param process the spawn server libuv process context
|
||||
* @return 0 on success or the libuv error code
|
||||
*/
|
||||
int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process)
|
||||
{
|
||||
uv_process_options_t options = {0};
|
||||
size_t exepath_size;
|
||||
char exepath[FILENAME_MAX];
|
||||
char *args[3];
|
||||
int ret;
|
||||
#define SPAWN_SERVER_DESCRIPTORS (3)
|
||||
uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS];
|
||||
|
||||
exepath_size = sizeof(exepath);
|
||||
ret = uv_exepath(exepath, &exepath_size);
|
||||
assert(ret == 0);
|
||||
|
||||
exepath[exepath_size] = '\0';
|
||||
args[0] = exepath;
|
||||
args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT;
|
||||
args[2] = NULL;
|
||||
|
||||
memset(&options, 0, sizeof(options));
|
||||
options.file = exepath;
|
||||
options.args = args;
|
||||
options.exit_cb = NULL; //exit_cb;
|
||||
options.stdio = stdio;
|
||||
options.stdio_count = SPAWN_SERVER_DESCRIPTORS;
|
||||
|
||||
stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
|
||||
stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */
|
||||
stdio[1].flags = UV_INHERIT_FD;
|
||||
stdio[1].data.fd = 1 /* UV_STDOUT_FD */;
|
||||
stdio[2].flags = UV_INHERIT_FD;
|
||||
stdio[2].data.fd = 2 /* UV_STDERR_FD */;
|
||||
|
||||
ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */
|
||||
assert(ret == 0);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
#define CONCURRENT_SPAWNS 16
|
||||
#define SPAWN_ITERATIONS 10000
|
||||
#undef CONCURRENT_STRESS_TEST
|
||||
|
||||
void spawn_init(void)
|
||||
{
|
||||
struct completion completion;
|
||||
int error;
|
||||
|
||||
info("Initializing spawn client.");
|
||||
|
||||
init_spawn_cmd_queue();
|
||||
|
||||
init_completion(&completion);
|
||||
error = uv_thread_create(&thread, spawn_client, &completion);
|
||||
if (error) {
|
||||
error("uv_thread_create(): %s", uv_strerror(error));
|
||||
goto after_error;
|
||||
}
|
||||
/* wait for spawn client thread to initialize */
|
||||
wait_for_completion(&completion);
|
||||
destroy_completion(&completion);
|
||||
uv_thread_set_name_np(thread, "DAEMON_SPAWN");
|
||||
|
||||
if (spawn_thread_error) {
|
||||
error = uv_thread_join(&thread);
|
||||
if (error) {
|
||||
error("uv_thread_create(): %s", uv_strerror(error));
|
||||
}
|
||||
goto after_error;
|
||||
}
|
||||
#ifdef CONCURRENT_STRESS_TEST
|
||||
signals_reset();
|
||||
signals_unblock();
|
||||
|
||||
sleep(60);
|
||||
uint64_t serial[CONCURRENT_SPAWNS];
|
||||
for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) {
|
||||
for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
|
||||
char cmd[64];
|
||||
sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1);
|
||||
serial[i] = spawn_enq_cmd(cmd);
|
||||
info("Queued command %s for spawning.", cmd);
|
||||
}
|
||||
int exit_status;
|
||||
time_t exec_run_timestamp;
|
||||
for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
|
||||
info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
|
||||
exec_run_timestamp);
|
||||
spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp);
|
||||
info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
|
||||
exec_run_timestamp);
|
||||
}
|
||||
}
|
||||
exit(0);
|
||||
#endif
|
||||
return;
|
||||
|
||||
after_error:
|
||||
error("Failed to initialize spawn service. The alarms notifications will not be spawned.");
|
||||
}
|
||||
109
spawn/spawn.h
Normal file
109
spawn/spawn.h
Normal file
@@ -0,0 +1,109 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#ifndef NETDATA_SPAWN_H
|
||||
#define NETDATA_SPAWN_H 1
|
||||
|
||||
#include "../daemon/common.h"
|
||||
|
||||
#define SPAWN_SERVER_COMMAND_LINE_ARGUMENT "--special-spawn-server"
|
||||
|
||||
typedef enum spawn_protocol {
|
||||
SPAWN_PROT_EXEC_CMD = 0,
|
||||
SPAWN_PROT_SPAWN_RESULT,
|
||||
SPAWN_PROT_CMD_EXIT_STATUS
|
||||
} spawn_prot_t;
|
||||
|
||||
struct spawn_prot_exec_cmd {
|
||||
uint16_t command_length;
|
||||
char command_to_run[];
|
||||
};
|
||||
|
||||
struct spawn_prot_spawn_result {
|
||||
pid_t exec_pid; /* 0 if failed to spawn */
|
||||
time_t exec_run_timestamp; /* time of successfully spawning the command */
|
||||
};
|
||||
|
||||
struct spawn_prot_cmd_exit_status {
|
||||
int exec_exit_status;
|
||||
};
|
||||
|
||||
struct spawn_prot_header {
|
||||
spawn_prot_t opcode;
|
||||
void *handle;
|
||||
};
|
||||
|
||||
#undef SPAWN_DEBUG /* define to enable debug prints */
|
||||
|
||||
#define SPAWN_MAX_OUTSTANDING (32768)
|
||||
|
||||
#define SPAWN_CMD_PROCESSED 0x00000001
|
||||
#define SPAWN_CMD_IN_PROGRESS 0x00000002
|
||||
#define SPAWN_CMD_FAILED_TO_SPAWN 0x00000004
|
||||
#define SPAWN_CMD_DONE 0x00000008
|
||||
|
||||
struct spawn_cmd_info {
|
||||
avl avl;
|
||||
|
||||
/* concurrency control per command */
|
||||
uv_mutex_t mutex;
|
||||
uv_cond_t cond; /* users block here until command has finished */
|
||||
|
||||
uint64_t serial;
|
||||
char *command_to_run;
|
||||
int exit_status;
|
||||
pid_t pid;
|
||||
unsigned long flags;
|
||||
time_t exec_run_timestamp; /* time of successfully spawning the command */
|
||||
};
|
||||
|
||||
/* spawn command queue */
|
||||
struct spawn_queue {
|
||||
avl_tree cmd_tree;
|
||||
|
||||
/* concurrency control of command queue */
|
||||
uv_mutex_t mutex;
|
||||
uv_cond_t cond;
|
||||
|
||||
volatile unsigned size;
|
||||
uint64_t latest_serial;
|
||||
};
|
||||
|
||||
struct write_context {
|
||||
uv_write_t write_req;
|
||||
struct spawn_prot_header header;
|
||||
struct spawn_prot_cmd_exit_status exit_status;
|
||||
struct spawn_prot_spawn_result spawn_result;
|
||||
struct spawn_prot_exec_cmd payload;
|
||||
};
|
||||
|
||||
extern int spawn_thread_error;
|
||||
extern int spawn_thread_shutdown;
|
||||
extern uv_async_t spawn_async;
|
||||
|
||||
void spawn_init(void);
|
||||
void spawn_server(void);
|
||||
void spawn_client(void *arg);
|
||||
void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo);
|
||||
uint64_t spawn_enq_cmd(char *command_to_run);
|
||||
void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp);
|
||||
void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo);
|
||||
struct spawn_cmd_info *spawn_get_unprocessed_cmd(void);
|
||||
int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process);
|
||||
|
||||
/*
|
||||
* Copies from the source buffer to the protocol buffer. It advances the source buffer by the amount copied. It
|
||||
* subtracts the amount copied from the source length.
|
||||
*/
|
||||
static inline void copy_to_prot_buffer(char *prot_buffer, unsigned *prot_buffer_len, unsigned max_to_copy,
|
||||
char **source, unsigned *source_len)
|
||||
{
|
||||
unsigned to_copy;
|
||||
|
||||
to_copy = MIN(max_to_copy, *source_len);
|
||||
memcpy(prot_buffer + *prot_buffer_len, *source, to_copy);
|
||||
*prot_buffer_len += to_copy;
|
||||
*source += to_copy;
|
||||
*source_len -= to_copy;
|
||||
}
|
||||
|
||||
#endif //NETDATA_SPAWN_H
|
||||
241
spawn/spawn_client.c
Normal file
241
spawn/spawn_client.c
Normal file
@@ -0,0 +1,241 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "spawn.h"
|
||||
#include "../database/engine/rrdenginelib.h"
|
||||
|
||||
static uv_process_t process;
|
||||
static uv_pipe_t spawn_channel;
|
||||
static uv_loop_t *loop;
|
||||
uv_async_t spawn_async;
|
||||
|
||||
static char prot_buffer[MAX_COMMAND_LENGTH];
|
||||
static unsigned prot_buffer_len = 0;
|
||||
|
||||
static void async_cb(uv_async_t *handle)
|
||||
{
|
||||
uv_stop(handle->loop);
|
||||
}
|
||||
|
||||
static void after_pipe_write(uv_write_t* req, int status)
|
||||
{
|
||||
(void)status;
|
||||
#ifdef SPAWN_DEBUG
|
||||
info("CLIENT %s called status=%d", __func__, status);
|
||||
#endif
|
||||
freez(req->data);
|
||||
}
|
||||
|
||||
static void client_parse_spawn_protocol(unsigned source_len, char *source)
|
||||
{
|
||||
unsigned required_len;
|
||||
struct spawn_prot_header *header;
|
||||
struct spawn_prot_spawn_result *spawn_result;
|
||||
struct spawn_prot_cmd_exit_status *exit_status;
|
||||
struct spawn_cmd_info *cmdinfo;
|
||||
|
||||
while (source_len) {
|
||||
required_len = sizeof(*header);
|
||||
if (prot_buffer_len < required_len)
|
||||
copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
|
||||
if (prot_buffer_len < required_len)
|
||||
return; /* Source buffer ran out */
|
||||
|
||||
header = (struct spawn_prot_header *)prot_buffer;
|
||||
cmdinfo = (struct spawn_cmd_info *)header->handle;
|
||||
assert(NULL != cmdinfo);
|
||||
|
||||
switch(header->opcode) {
|
||||
case SPAWN_PROT_SPAWN_RESULT:
|
||||
required_len += sizeof(*spawn_result);
|
||||
if (prot_buffer_len < required_len)
|
||||
copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
|
||||
if (prot_buffer_len < required_len)
|
||||
return; /* Source buffer ran out */
|
||||
|
||||
spawn_result = (struct spawn_prot_spawn_result *)(header + 1);
|
||||
uv_mutex_lock(&cmdinfo->mutex);
|
||||
cmdinfo->pid = spawn_result->exec_pid;
|
||||
if (0 == cmdinfo->pid) { /* Failed to spawn */
|
||||
#ifdef SPAWN_DEBUG
|
||||
info("CLIENT %s SPAWN_PROT_SPAWN_RESULT failed to spawn.", __func__);
|
||||
#endif
|
||||
cmdinfo->flags |= SPAWN_CMD_FAILED_TO_SPAWN | SPAWN_CMD_DONE;
|
||||
uv_cond_signal(&cmdinfo->cond);
|
||||
} else {
|
||||
cmdinfo->exec_run_timestamp = spawn_result->exec_run_timestamp;
|
||||
cmdinfo->flags |= SPAWN_CMD_IN_PROGRESS;
|
||||
#ifdef SPAWN_DEBUG
|
||||
info("CLIENT %s SPAWN_PROT_SPAWN_RESULT in progress.", __func__);
|
||||
#endif
|
||||
}
|
||||
uv_mutex_unlock(&cmdinfo->mutex);
|
||||
prot_buffer_len = 0;
|
||||
break;
|
||||
case SPAWN_PROT_CMD_EXIT_STATUS:
|
||||
required_len += sizeof(*exit_status);
|
||||
if (prot_buffer_len < required_len)
|
||||
copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
|
||||
if (prot_buffer_len < required_len)
|
||||
return; /* Source buffer ran out */
|
||||
|
||||
exit_status = (struct spawn_prot_cmd_exit_status *)(header + 1);
|
||||
uv_mutex_lock(&cmdinfo->mutex);
|
||||
cmdinfo->exit_status = exit_status->exec_exit_status;
|
||||
#ifdef SPAWN_DEBUG
|
||||
info("CLIENT %s SPAWN_PROT_CMD_EXIT_STATUS %d.", __func__, exit_status->exec_exit_status);
|
||||
#endif
|
||||
cmdinfo->flags |= SPAWN_CMD_DONE;
|
||||
uv_cond_signal(&cmdinfo->cond);
|
||||
uv_mutex_unlock(&cmdinfo->mutex);
|
||||
prot_buffer_len = 0;
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
static void on_pipe_read(uv_stream_t* pipe, ssize_t nread, const uv_buf_t* buf)
|
||||
{
|
||||
if (0 == nread) {
|
||||
info("%s: Zero bytes read from spawn pipe.", __func__);
|
||||
} else if (UV_EOF == nread) {
|
||||
info("EOF found in spawn pipe.");
|
||||
} else if (nread < 0) {
|
||||
error("%s: %s", __func__, uv_strerror(nread));
|
||||
}
|
||||
|
||||
if (nread < 0) { /* stop stream due to EOF or error */
|
||||
(void)uv_read_stop((uv_stream_t *)pipe);
|
||||
} else if (nread) {
|
||||
#ifdef SPAWN_DEBUG
|
||||
info("CLIENT %s read %u", __func__, (unsigned)nread);
|
||||
#endif
|
||||
client_parse_spawn_protocol(nread, buf->base);
|
||||
}
|
||||
if (buf && buf->len) {
|
||||
freez(buf->base);
|
||||
}
|
||||
|
||||
if (nread < 0) {
|
||||
uv_close((uv_handle_t *)pipe, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
static void on_read_alloc(uv_handle_t* handle,
|
||||
size_t suggested_size,
|
||||
uv_buf_t* buf)
|
||||
{
|
||||
(void)handle;
|
||||
buf->base = mallocz(suggested_size);
|
||||
buf->len = suggested_size;
|
||||
}
|
||||
|
||||
static void spawn_process_cmd(struct spawn_cmd_info *cmdinfo)
|
||||
{
|
||||
int ret;
|
||||
uv_buf_t writebuf[3];
|
||||
struct write_context *write_ctx;
|
||||
|
||||
write_ctx = mallocz(sizeof(*write_ctx));
|
||||
write_ctx->write_req.data = write_ctx;
|
||||
|
||||
uv_mutex_lock(&cmdinfo->mutex);
|
||||
cmdinfo->flags |= SPAWN_CMD_PROCESSED;
|
||||
uv_mutex_unlock(&cmdinfo->mutex);
|
||||
|
||||
write_ctx->header.opcode = SPAWN_PROT_EXEC_CMD;
|
||||
write_ctx->header.handle = cmdinfo;
|
||||
write_ctx->payload.command_length = strlen(cmdinfo->command_to_run);
|
||||
|
||||
writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header));
|
||||
writebuf[1] = uv_buf_init((char *)&write_ctx->payload, sizeof(write_ctx->payload));
|
||||
writebuf[2] = uv_buf_init((char *)cmdinfo->command_to_run, write_ctx->payload.command_length);
|
||||
|
||||
#ifdef SPAWN_DEBUG
|
||||
info("CLIENT %s SPAWN_PROT_EXEC_CMD %u", __func__, (unsigned)cmdinfo->serial);
|
||||
#endif
|
||||
ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&spawn_channel, writebuf, 3, after_pipe_write);
|
||||
assert(ret == 0);
|
||||
}
|
||||
|
||||
void spawn_client(void *arg)
|
||||
{
|
||||
int ret;
|
||||
struct completion *completion = (struct completion *)arg;
|
||||
|
||||
loop = mallocz(sizeof(uv_loop_t));
|
||||
ret = uv_loop_init(loop);
|
||||
if (ret) {
|
||||
error("uv_loop_init(): %s", uv_strerror(ret));
|
||||
spawn_thread_error = ret;
|
||||
goto error_after_loop_init;
|
||||
}
|
||||
loop->data = NULL;
|
||||
|
||||
spawn_async.data = NULL;
|
||||
ret = uv_async_init(loop, &spawn_async, async_cb);
|
||||
if (ret) {
|
||||
error("uv_async_init(): %s", uv_strerror(ret));
|
||||
spawn_thread_error = ret;
|
||||
goto error_after_async_init;
|
||||
}
|
||||
|
||||
ret = uv_pipe_init(loop, &spawn_channel, 1);
|
||||
if (ret) {
|
||||
error("uv_pipe_init(): %s", uv_strerror(ret));
|
||||
spawn_thread_error = ret;
|
||||
goto error_after_pipe_init;
|
||||
}
|
||||
assert(spawn_channel.ipc);
|
||||
|
||||
ret = create_spawn_server(loop, &spawn_channel, &process);
|
||||
if (ret) {
|
||||
error("Failed to fork spawn server process.");
|
||||
spawn_thread_error = ret;
|
||||
goto error_after_spawn_server;
|
||||
}
|
||||
|
||||
spawn_thread_error = 0;
|
||||
spawn_thread_shutdown = 0;
|
||||
/* wake up initialization thread */
|
||||
complete(completion);
|
||||
|
||||
prot_buffer_len = 0;
|
||||
ret = uv_read_start((uv_stream_t *)&spawn_channel, on_read_alloc, on_pipe_read);
|
||||
assert(ret == 0);
|
||||
|
||||
while (spawn_thread_shutdown == 0) {
|
||||
struct spawn_cmd_info *cmdinfo;
|
||||
|
||||
uv_run(loop, UV_RUN_DEFAULT);
|
||||
while (NULL != (cmdinfo = spawn_get_unprocessed_cmd())) {
|
||||
spawn_process_cmd(cmdinfo);
|
||||
}
|
||||
}
|
||||
/* cleanup operations of the event loop */
|
||||
info("Shutting down spawn client event loop.");
|
||||
uv_close((uv_handle_t *)&spawn_channel, NULL);
|
||||
uv_close((uv_handle_t *)&spawn_async, NULL);
|
||||
uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
|
||||
|
||||
info("Shutting down spawn client loop complete.");
|
||||
assert(0 == uv_loop_close(loop));
|
||||
|
||||
return;
|
||||
|
||||
error_after_spawn_server:
|
||||
uv_close((uv_handle_t *)&spawn_channel, NULL);
|
||||
error_after_pipe_init:
|
||||
uv_close((uv_handle_t *)&spawn_async, NULL);
|
||||
error_after_async_init:
|
||||
uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
|
||||
assert(0 == uv_loop_close(loop));
|
||||
error_after_loop_init:
|
||||
freez(loop);
|
||||
|
||||
/* wake up initialization thread */
|
||||
complete(completion);
|
||||
}
|
||||
377
spawn/spawn_server.c
Normal file
377
spawn/spawn_server.c
Normal file
@@ -0,0 +1,377 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "spawn.h"
|
||||
|
||||
static uv_loop_t *loop;
|
||||
static uv_pipe_t server_pipe;
|
||||
|
||||
static int server_shutdown = 0;
|
||||
|
||||
static uv_thread_t thread;
|
||||
|
||||
/* spawn outstanding execution structure */
|
||||
static avl_tree_lock spawn_outstanding_exec_tree;
|
||||
|
||||
static char prot_buffer[MAX_COMMAND_LENGTH];
|
||||
static unsigned prot_buffer_len = 0;
|
||||
|
||||
struct spawn_execution_info {
|
||||
avl avl;
|
||||
|
||||
void *handle;
|
||||
int exit_status;
|
||||
pid_t pid;
|
||||
struct spawn_execution_info *next;
|
||||
};
|
||||
|
||||
int spawn_exec_compare(void *a, void *b)
|
||||
{
|
||||
struct spawn_execution_info *spwna = a, *spwnb = b;
|
||||
|
||||
if (spwna->pid < spwnb->pid) return -1;
|
||||
if (spwna->pid > spwnb->pid) return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* wake up waiter thread to reap the spawned processes */
|
||||
static uv_mutex_t wait_children_mutex;
|
||||
static uv_cond_t wait_children_cond;
|
||||
static uint8_t spawned_processes;
|
||||
static struct spawn_execution_info *child_waited_list;
|
||||
static uv_async_t child_waited_async;
|
||||
|
||||
static inline struct spawn_execution_info *dequeue_child_waited_list(void)
|
||||
{
|
||||
struct spawn_execution_info *exec_info;
|
||||
|
||||
uv_mutex_lock(&wait_children_mutex);
|
||||
if (NULL == child_waited_list) {
|
||||
exec_info = NULL;
|
||||
} else {
|
||||
exec_info = child_waited_list;
|
||||
child_waited_list = exec_info->next;
|
||||
}
|
||||
uv_mutex_unlock(&wait_children_mutex);
|
||||
|
||||
return exec_info;
|
||||
}
|
||||
|
||||
static inline void enqueue_child_waited_list(struct spawn_execution_info *exec_info)
|
||||
{
|
||||
uv_mutex_lock(&wait_children_mutex);
|
||||
exec_info->next = child_waited_list;
|
||||
child_waited_list = exec_info;
|
||||
uv_mutex_unlock(&wait_children_mutex);
|
||||
}
|
||||
|
||||
static void after_pipe_write(uv_write_t *req, int status)
|
||||
{
|
||||
(void)status;
|
||||
#ifdef SPAWN_DEBUG
|
||||
fprintf(stderr, "SERVER %s called status=%d\n", __func__, status);
|
||||
#endif
|
||||
freez(req->data);
|
||||
}
|
||||
|
||||
static void child_waited_async_cb(uv_async_t *async_handle)
|
||||
{
|
||||
uv_buf_t writebuf[2];
|
||||
int ret;
|
||||
struct spawn_execution_info *exec_info;
|
||||
struct write_context *write_ctx;
|
||||
|
||||
(void)async_handle;
|
||||
while (NULL != (exec_info = dequeue_child_waited_list())) {
|
||||
write_ctx = mallocz(sizeof(*write_ctx));
|
||||
write_ctx->write_req.data = write_ctx;
|
||||
|
||||
|
||||
write_ctx->header.opcode = SPAWN_PROT_CMD_EXIT_STATUS;
|
||||
write_ctx->header.handle = exec_info->handle;
|
||||
write_ctx->exit_status.exec_exit_status = exec_info->exit_status;
|
||||
writebuf[0] = uv_buf_init((char *) &write_ctx->header, sizeof(write_ctx->header));
|
||||
writebuf[1] = uv_buf_init((char *) &write_ctx->exit_status, sizeof(write_ctx->exit_status));
|
||||
#ifdef SPAWN_DEBUG
|
||||
fprintf(stderr, "SERVER %s SPAWN_PROT_CMD_EXIT_STATUS\n", __func__);
|
||||
#endif
|
||||
ret = uv_write(&write_ctx->write_req, (uv_stream_t *) &server_pipe, writebuf, 2, after_pipe_write);
|
||||
assert(ret == 0);
|
||||
|
||||
freez(exec_info);
|
||||
}
|
||||
}
|
||||
|
||||
static void wait_children(void *arg)
|
||||
{
|
||||
siginfo_t i;
|
||||
struct spawn_execution_info tmp, *exec_info;
|
||||
avl *ret_avl;
|
||||
|
||||
(void)arg;
|
||||
while (!server_shutdown) {
|
||||
uv_mutex_lock(&wait_children_mutex);
|
||||
while (!spawned_processes) {
|
||||
uv_cond_wait(&wait_children_cond, &wait_children_mutex);
|
||||
}
|
||||
spawned_processes = 0;
|
||||
uv_mutex_unlock(&wait_children_mutex);
|
||||
|
||||
while (!server_shutdown) {
|
||||
i.si_pid = 0;
|
||||
if (waitid(P_ALL, (id_t) 0, &i, WEXITED) == -1) {
|
||||
if (errno != ECHILD)
|
||||
fprintf(stderr, "SPAWN: Failed to wait: %s\n", strerror(errno));
|
||||
break;
|
||||
}
|
||||
if (i.si_pid == 0) {
|
||||
fprintf(stderr, "SPAWN: No child exited.\n");
|
||||
break;
|
||||
}
|
||||
#ifdef SPAWN_DEBUG
|
||||
fprintf(stderr, "SPAWN: Successfully waited for pid:%d.\n", (int) i.si_pid);
|
||||
#endif
|
||||
assert(CLD_EXITED == i.si_code);
|
||||
tmp.pid = (pid_t)i.si_pid;
|
||||
while (NULL == (ret_avl = avl_remove_lock(&spawn_outstanding_exec_tree, (avl *)&tmp))) {
|
||||
fprintf(stderr,
|
||||
"SPAWN: race condition detected, waiting for child process %d to be indexed.\n",
|
||||
(int)tmp.pid);
|
||||
(void)sleep_usec(10000); /* 10 msec */
|
||||
}
|
||||
exec_info = (struct spawn_execution_info *)ret_avl;
|
||||
exec_info->exit_status = i.si_status;
|
||||
enqueue_child_waited_list(exec_info);
|
||||
|
||||
/* wake up event loop */
|
||||
assert(0 == uv_async_send(&child_waited_async));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void spawn_protocol_execute_command(void *handle, char *command_to_run, uint16_t command_length)
|
||||
{
|
||||
uv_buf_t writebuf[2];
|
||||
int ret;
|
||||
avl *avl_ret;
|
||||
struct spawn_execution_info *exec_info;
|
||||
struct write_context *write_ctx;
|
||||
|
||||
write_ctx = mallocz(sizeof(*write_ctx));
|
||||
write_ctx->write_req.data = write_ctx;
|
||||
|
||||
command_to_run[command_length] = '\0';
|
||||
#ifdef SPAWN_DEBUG
|
||||
fprintf(stderr, "SPAWN: executing command '%s'\n", command_to_run);
|
||||
#endif
|
||||
if (netdata_spawn(command_to_run, &write_ctx->spawn_result.exec_pid)) {
|
||||
fprintf(stderr, "SPAWN: Cannot spawn(\"%s\", \"r\").\n", command_to_run);
|
||||
write_ctx->spawn_result.exec_pid = 0;
|
||||
} else { /* successfully spawned command */
|
||||
write_ctx->spawn_result.exec_run_timestamp = now_realtime_sec();
|
||||
|
||||
/* record it for when the process finishes execution */
|
||||
exec_info = mallocz(sizeof(*exec_info));
|
||||
exec_info->handle = handle;
|
||||
exec_info->pid = write_ctx->spawn_result.exec_pid;
|
||||
avl_ret = avl_insert_lock(&spawn_outstanding_exec_tree, (avl *)exec_info);
|
||||
assert(avl_ret == (avl *)exec_info);
|
||||
|
||||
/* wake up the thread that blocks waiting for processes to exit */
|
||||
uv_mutex_lock(&wait_children_mutex);
|
||||
spawned_processes = 1;
|
||||
uv_cond_signal(&wait_children_cond);
|
||||
uv_mutex_unlock(&wait_children_mutex);
|
||||
}
|
||||
|
||||
write_ctx->header.opcode = SPAWN_PROT_SPAWN_RESULT;
|
||||
write_ctx->header.handle = handle;
|
||||
writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header));
|
||||
writebuf[1] = uv_buf_init((char *)&write_ctx->spawn_result, sizeof(write_ctx->spawn_result));
|
||||
#ifdef SPAWN_DEBUG
|
||||
fprintf(stderr, "SERVER %s SPAWN_PROT_SPAWN_RESULT\n", __func__);
|
||||
#endif
|
||||
ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&server_pipe, writebuf, 2, after_pipe_write);
|
||||
assert(ret == 0);
|
||||
}
|
||||
|
||||
static void server_parse_spawn_protocol(unsigned source_len, char *source)
|
||||
{
|
||||
unsigned required_len;
|
||||
struct spawn_prot_header *header;
|
||||
struct spawn_prot_exec_cmd *payload;
|
||||
uint16_t command_length;
|
||||
|
||||
while (source_len) {
|
||||
required_len = sizeof(*header);
|
||||
if (prot_buffer_len < required_len)
|
||||
copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
|
||||
if (prot_buffer_len < required_len)
|
||||
return; /* Source buffer ran out */
|
||||
|
||||
header = (struct spawn_prot_header *)prot_buffer;
|
||||
assert(SPAWN_PROT_EXEC_CMD == header->opcode);
|
||||
assert(NULL != header->handle);
|
||||
|
||||
required_len += sizeof(*payload);
|
||||
if (prot_buffer_len < required_len)
|
||||
copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
|
||||
if (prot_buffer_len < required_len)
|
||||
return; /* Source buffer ran out */
|
||||
|
||||
payload = (struct spawn_prot_exec_cmd *)(header + 1);
|
||||
command_length = payload->command_length;
|
||||
|
||||
required_len += command_length;
|
||||
if (unlikely(required_len > MAX_COMMAND_LENGTH - 1)) {
|
||||
fprintf(stderr, "SPAWN: Ran out of protocol buffer space.\n");
|
||||
command_length = (MAX_COMMAND_LENGTH - 1) - (sizeof(*header) + sizeof(*payload));
|
||||
required_len = MAX_COMMAND_LENGTH - 1;
|
||||
}
|
||||
if (prot_buffer_len < required_len)
|
||||
copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
|
||||
if (prot_buffer_len < required_len)
|
||||
return; /* Source buffer ran out */
|
||||
|
||||
spawn_protocol_execute_command(header->handle, payload->command_to_run, command_length);
|
||||
prot_buffer_len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
static void on_pipe_read(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf)
|
||||
{
|
||||
if (0 == nread) {
|
||||
fprintf(stderr, "SERVER %s: Zero bytes read from spawn pipe.\n", __func__);
|
||||
} else if (UV_EOF == nread) {
|
||||
fprintf(stderr, "EOF found in spawn pipe.\n");
|
||||
} else if (nread < 0) {
|
||||
fprintf(stderr, "%s: %s\n", __func__, uv_strerror(nread));
|
||||
}
|
||||
|
||||
if (nread < 0) { /* stop spawn server due to EOF or error */
|
||||
int error;
|
||||
|
||||
uv_mutex_lock(&wait_children_mutex);
|
||||
server_shutdown = 1;
|
||||
spawned_processes = 1;
|
||||
uv_cond_signal(&wait_children_cond);
|
||||
uv_mutex_unlock(&wait_children_mutex);
|
||||
|
||||
fprintf(stderr, "Shutting down spawn server event loop.\n");
|
||||
/* cleanup operations of the event loop */
|
||||
(void)uv_read_stop((uv_stream_t *) pipe);
|
||||
uv_close((uv_handle_t *)&server_pipe, NULL);
|
||||
|
||||
error = uv_thread_join(&thread);
|
||||
if (error) {
|
||||
fprintf(stderr, "uv_thread_create(): %s", uv_strerror(error));
|
||||
}
|
||||
/* After joining it is safe to destroy child_waited_async */
|
||||
uv_close((uv_handle_t *)&child_waited_async, NULL);
|
||||
} else if (nread) {
|
||||
#ifdef SPAWN_DEBUG
|
||||
fprintf(stderr, "SERVER %s nread %u\n", __func__, (unsigned)nread);
|
||||
#endif
|
||||
server_parse_spawn_protocol(nread, buf->base);
|
||||
}
|
||||
if (buf && buf->len) {
|
||||
freez(buf->base);
|
||||
}
|
||||
}
|
||||
|
||||
static void on_read_alloc(uv_handle_t *handle,
|
||||
size_t suggested_size,
|
||||
uv_buf_t* buf)
|
||||
{
|
||||
(void)handle;
|
||||
buf->base = mallocz(suggested_size);
|
||||
buf->len = suggested_size;
|
||||
}
|
||||
|
||||
static void ignore_signal_handler(int signo) {
|
||||
/*
|
||||
* By having a signal handler we allow spawned processes to reset default signal dispositions. Setting SIG_IGN
|
||||
* would be inherited by the spawned children which is not desirable.
|
||||
*/
|
||||
(void)signo;
|
||||
}
|
||||
|
||||
void spawn_server(void)
|
||||
{
|
||||
int error;
|
||||
|
||||
test_clock_boottime();
|
||||
test_clock_monotonic_coarse();
|
||||
|
||||
// close all open file descriptors, except the standard ones
|
||||
// the caller may have left open files (lxc-attach has this issue)
|
||||
int fd;
|
||||
for(fd = (int)(sysconf(_SC_OPEN_MAX) - 1) ; fd > 2 ; --fd)
|
||||
if(fd_is_valid(fd))
|
||||
close(fd);
|
||||
|
||||
// Have the libuv IPC pipe be closed when forking child processes
|
||||
(void) fcntl(0, F_SETFD, FD_CLOEXEC);
|
||||
fprintf(stderr, "Spawn server is up.\n");
|
||||
|
||||
// Define signals we want to ignore
|
||||
struct sigaction sa;
|
||||
int signals_to_ignore[] = {SIGPIPE, SIGINT, SIGQUIT, SIGTERM, SIGHUP, SIGUSR1, SIGUSR2, SIGBUS, SIGCHLD};
|
||||
unsigned ignore_length = sizeof(signals_to_ignore) / sizeof(signals_to_ignore[0]);
|
||||
|
||||
unsigned i;
|
||||
for (i = 0; i < ignore_length ; ++i) {
|
||||
sa.sa_flags = 0;
|
||||
sigemptyset(&sa.sa_mask);
|
||||
sa.sa_handler = ignore_signal_handler;
|
||||
if(sigaction(signals_to_ignore[i], &sa, NULL) == -1)
|
||||
fprintf(stderr, "SPAWN: Failed to change signal handler for signal: %d.\n", signals_to_ignore[i]);
|
||||
}
|
||||
|
||||
signals_unblock();
|
||||
|
||||
loop = uv_default_loop();
|
||||
loop->data = NULL;
|
||||
|
||||
error = uv_pipe_init(loop, &server_pipe, 1);
|
||||
if (error) {
|
||||
fprintf(stderr, "uv_pipe_init(): %s\n", uv_strerror(error));
|
||||
exit(error);
|
||||
}
|
||||
assert(server_pipe.ipc);
|
||||
|
||||
error = uv_pipe_open(&server_pipe, 0 /* UV_STDIN_FD */);
|
||||
if (error) {
|
||||
fprintf(stderr, "uv_pipe_open(): %s\n", uv_strerror(error));
|
||||
exit(error);
|
||||
}
|
||||
avl_init_lock(&spawn_outstanding_exec_tree, spawn_exec_compare);
|
||||
|
||||
spawned_processes = 0;
|
||||
assert(0 == uv_cond_init(&wait_children_cond));
|
||||
assert(0 == uv_mutex_init(&wait_children_mutex));
|
||||
child_waited_list = NULL;
|
||||
error = uv_async_init(loop, &child_waited_async, child_waited_async_cb);
|
||||
if (error) {
|
||||
fprintf(stderr, "uv_async_init(): %s\n", uv_strerror(error));
|
||||
exit(error);
|
||||
}
|
||||
|
||||
error = uv_thread_create(&thread, wait_children, NULL);
|
||||
if (error) {
|
||||
fprintf(stderr, "uv_thread_create(): %s\n", uv_strerror(error));
|
||||
exit(error);
|
||||
}
|
||||
|
||||
prot_buffer_len = 0;
|
||||
error = uv_read_start((uv_stream_t *)&server_pipe, on_read_alloc, on_pipe_read);
|
||||
assert(error == 0);
|
||||
|
||||
while (!server_shutdown) {
|
||||
uv_run(loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
fprintf(stderr, "Shutting down spawn server loop complete.\n");
|
||||
assert(0 == uv_loop_close(loop));
|
||||
|
||||
exit(0);
|
||||
}
|
||||
Reference in New Issue
Block a user