| /* |
| * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved. |
| * Copyright (C) 2004-2014 Red Hat, Inc. All rights reserved. |
| * |
| * This file is part of LVM2. |
| * |
| * This copyrighted material is made available to anyone wishing to use, |
| * modify, copy, or redistribute it subject to the terms and conditions |
| * of the GNU General Public License v.2. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with this program; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| */ |
| |
| /* |
| * CLVMD: Cluster LVM daemon |
| */ |
| |
| #include "clvmd-common.h" |
| |
| #include "clvmd-comms.h" |
| #include "clvm.h" |
| #include "clvmd.h" |
| #include "lvm-functions.h" |
| #include "lvm-version.h" |
| #include "refresh_clvmd.h" |
| |
| #ifdef HAVE_COROSYNC_CONFDB_H |
| #include <corosync/confdb.h> |
| #endif |
| |
| #include <pthread.h> |
| #include <getopt.h> |
| #include <ctype.h> |
| #include <stdarg.h> |
| |
| #include <fcntl.h> |
| #include <netinet/in.h> |
| #include <signal.h> |
| #include <stddef.h> |
| #include <syslog.h> |
| #include <sys/un.h> |
| #include <sys/utsname.h> |
| |
| #ifndef TRUE |
| #define TRUE 1 |
| #endif |
| #ifndef FALSE |
| #define FALSE 0 |
| #endif |
| |
| #define MAX_RETRIES 4 |
| #define MAX_MISSING_LEN 8000 /* Max supported clvmd message size ? */ |
| |
| #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0) |
| |
| /* Head of the fd list. Also contains |
| the cluster_socket details */ |
| static struct local_client local_client_head; |
| |
| static unsigned short global_xid = 0; /* Last transaction ID issued */ |
| |
| struct cluster_ops *clops = NULL; |
| |
| static char our_csid[MAX_CSID_LEN]; |
| static unsigned max_csid_len; |
| static unsigned max_cluster_message; |
| static unsigned max_cluster_member_name_len; |
| |
| /* Structure of items on the LVM thread list */ |
| struct lvm_thread_cmd { |
| struct dm_list list; |
| |
| struct local_client *client; |
| struct clvm_header *msg; |
| char csid[MAX_CSID_LEN]; |
| int remote; /* Flag */ |
| int msglen; |
| unsigned short xid; |
| }; |
| |
| struct lvm_startup_params { |
| struct dm_hash_table *excl_uuid; |
| }; |
| |
| static debug_t debug = DEBUG_OFF; |
| static int foreground_mode = 0; |
| static pthread_t lvm_thread; |
| /* Stack size 128KiB for thread, must be bigger then DEFAULT_RESERVED_STACK */ |
| static const size_t STACK_SIZE = 128 * 1024; |
| static pthread_attr_t stack_attr; |
| static int lvm_thread_exit = 0; |
| static pthread_mutex_t lvm_thread_mutex; |
| static pthread_cond_t lvm_thread_cond; |
| static pthread_barrier_t lvm_start_barrier; |
| static struct dm_list lvm_cmd_head; |
| static volatile sig_atomic_t quit = 0; |
| static volatile sig_atomic_t reread_config = 0; |
| static int child_pipe[2]; |
| |
| /* Reasons the daemon failed initialisation */ |
| #define DFAIL_INIT 1 |
| #define DFAIL_LOCAL_SOCK 2 |
| #define DFAIL_CLUSTER_IF 3 |
| #define DFAIL_MALLOC 4 |
| #define DFAIL_TIMEOUT 5 |
| #define SUCCESS 0 |
| |
| typedef enum {IF_AUTO, IF_CMAN, IF_OPENAIS, IF_COROSYNC, IF_SINGLENODE} if_type_t; |
| |
| /* Prototypes for code further down */ |
| static void sigusr2_handler(int sig); |
| static void sighup_handler(int sig); |
| static void sigterm_handler(int sig); |
| static void send_local_reply(struct local_client *client, int status, |
| int clientid); |
| static void free_reply(struct local_client *client); |
| static void send_version_message(void); |
| static void *pre_and_post_thread(void *arg); |
| static int send_message(void *buf, int msglen, const char *csid, int fd, |
| const char *errtext); |
| static int read_from_local_sock(struct local_client *thisfd); |
| static int cleanup_zombie(struct local_client *thisfd); |
| static int process_local_command(struct clvm_header *msg, int msglen, |
| struct local_client *client, |
| unsigned short xid); |
| static void process_remote_command(struct clvm_header *msg, int msglen, int fd, |
| const char *csid); |
| static int process_reply(const struct clvm_header *msg, int msglen, |
| const char *csid); |
| static int open_local_sock(void); |
| static void close_local_sock(int local_socket); |
| static int check_local_clvmd(void); |
| static struct local_client *find_client(int clientid); |
| static void main_loop(int cmd_timeout); |
| static void be_daemon(int start_timeout); |
| static int check_all_clvmds_running(struct local_client *client); |
| static int local_rendezvous_callback(struct local_client *thisfd, char *buf, |
| int len, const char *csid, |
| struct local_client **new_client); |
| static void *lvm_thread_fn(void *) __attribute__((noreturn)); |
| static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg, |
| int msglen, const char *csid); |
| static int distribute_command(struct local_client *thisfd); |
| static void hton_clvm(struct clvm_header *hdr); |
| static void ntoh_clvm(struct clvm_header *hdr); |
| static void add_reply_to_list(struct local_client *client, int status, |
| const char *csid, const char *buf, int len); |
| static if_type_t parse_cluster_interface(char *ifname); |
| static if_type_t get_cluster_type(void); |
| |
| static void usage(const char *prog, FILE *file) |
| { |
| fprintf(file, "Usage: %s [options]\n" |
| " -C Sets debug level (from -d) on all clvmd instances clusterwide\n" |
| " -d[<n>] Set debug logging (0:none, 1:stderr (implies -f option), 2:syslog)\n" |
| " -E<uuid> Take this lock uuid as exclusively locked resource (for restart)\n" |
| " -f Don't fork, run in the foreground\n" |
| " -h Show this help information\n" |
| " -I<cmgr> Cluster manager (default: auto)\n" |
| " Available cluster managers: " |
| #ifdef USE_COROSYNC |
| "corosync " |
| #endif |
| #ifdef USE_CMAN |
| "cman " |
| #endif |
| #ifdef USE_OPENAIS |
| "openais " |
| #endif |
| #ifdef USE_SINGLENODE |
| "singlenode " |
| #endif |
| "\n" |
| " -R Tell all running clvmds in the cluster to reload their device cache\n" |
| " -S Restart clvmd, preserving exclusive locks\n" |
| " -t<secs> Command timeout (default: 60 seconds)\n" |
| " -T<secs> Startup timeout (default: 0 seconds)\n" |
| " -V Show version of clvmd\n" |
| "\n", prog); |
| } |
| |
| /* Called to signal the parent how well we got on during initialisation */ |
| static void child_init_signal(int status) |
| { |
| if (child_pipe[1]) { |
| /* FIXME Use a proper wrapper around write */ |
| if (write(child_pipe[1], &status, sizeof(status)) < 0) |
| log_sys_error("write", "child_pipe"); |
| if (close(child_pipe[1])) |
| log_sys_error("close", "child_pipe"); |
| } |
| } |
| |
| static __attribute__((noreturn)) void child_init_signal_and_exit(int status) |
| { |
| child_init_signal(status); |
| exit(status); |
| } |
| |
| static void safe_close(int *fd) |
| { |
| if (*fd >= 0) { |
| int to_close = *fd; |
| *fd = -1; |
| if (close(to_close)) |
| log_sys_error("close", ""); /* path */ |
| } |
| } |
| |
| void debuglog(const char *fmt, ...) |
| { |
| time_t P; |
| va_list ap; |
| static int syslog_init = 0; |
| char buf_ctime[64]; |
| |
| switch (clvmd_get_debug()) { |
| case DEBUG_STDERR: |
| va_start(ap,fmt); |
| time(&P); |
| fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime_r(&P, buf_ctime) + 4); |
| vfprintf(stderr, fmt, ap); |
| va_end(ap); |
| fflush(stderr); |
| break; |
| case DEBUG_SYSLOG: |
| if (!syslog_init) { |
| openlog("clvmd", LOG_PID, LOG_DAEMON); |
| syslog_init = 1; |
| } |
| |
| va_start(ap,fmt); |
| vsyslog(LOG_DEBUG, fmt, ap); |
| va_end(ap); |
| break; |
| case DEBUG_OFF: |
| break; |
| } |
| } |
| |
| void clvmd_set_debug(debug_t new_debug) |
| { |
| if (!foreground_mode && new_debug == DEBUG_STDERR) |
| new_debug = DEBUG_SYSLOG; |
| |
| if (new_debug > DEBUG_SYSLOG) |
| new_debug = DEBUG_SYSLOG; |
| |
| debug = new_debug; |
| } |
| |
| debug_t clvmd_get_debug(void) |
| { |
| return debug; |
| } |
| |
| int clvmd_get_foreground(void) |
| { |
| return foreground_mode; |
| } |
| |
| static const char *decode_cmd(unsigned char cmdl) |
| { |
| static char buf[128]; |
| const char *command; |
| |
| switch (cmdl) { |
| case CLVMD_CMD_TEST: |
| command = "TEST"; |
| break; |
| case CLVMD_CMD_LOCK_VG: |
| command = "LOCK_VG"; |
| break; |
| case CLVMD_CMD_LOCK_LV: |
| command = "LOCK_LV"; |
| break; |
| case CLVMD_CMD_REFRESH: |
| command = "REFRESH"; |
| break; |
| case CLVMD_CMD_SET_DEBUG: |
| command = "SET_DEBUG"; |
| break; |
| case CLVMD_CMD_GET_CLUSTERNAME: |
| command = "GET_CLUSTERNAME"; |
| break; |
| case CLVMD_CMD_VG_BACKUP: |
| command = "VG_BACKUP"; |
| break; |
| case CLVMD_CMD_REPLY: |
| command = "REPLY"; |
| break; |
| case CLVMD_CMD_VERSION: |
| command = "VERSION"; |
| break; |
| case CLVMD_CMD_GOAWAY: |
| command = "GOAWAY"; |
| break; |
| case CLVMD_CMD_LOCK: |
| command = "LOCK"; |
| break; |
| case CLVMD_CMD_UNLOCK: |
| command = "UNLOCK"; |
| break; |
| case CLVMD_CMD_LOCK_QUERY: |
| command = "LOCK_QUERY"; |
| break; |
| case CLVMD_CMD_RESTART: |
| command = "RESTART"; |
| break; |
| case CLVMD_CMD_SYNC_NAMES: |
| command = "SYNC_NAMES"; |
| break; |
| default: |
| command = "unknown"; |
| break; |
| } |
| |
| snprintf(buf, sizeof(buf), "%s (0x%x)", command, cmdl); |
| |
| return buf; |
| } |
| |
| static void remove_lockfile(void) |
| { |
| if (unlink(CLVMD_PIDFILE)) |
| log_sys_error("unlink", CLVMD_PIDFILE); |
| } |
| |
| /* |
| * clvmd require dm-ioctl capability for operation |
| */ |
| static void check_permissions(void) |
| { |
| if (getuid() || geteuid()) { |
| log_error("Cannot run as a non-root user."); |
| |
| /* |
| * Fail cleanly here if not run as root, instead of failing |
| * later when attempting a root-only operation |
| * Preferred exit code from an initscript for this. |
| */ |
| exit(4); |
| } |
| } |
| |
| int main(int argc, char *argv[]) |
| { |
| int local_sock; |
| struct local_client *newfd, *delfd; |
| struct lvm_startup_params lvm_params; |
| int opt; |
| int cmd_timeout = DEFAULT_CMD_TIMEOUT; |
| int start_timeout = 0; |
| if_type_t cluster_iface = IF_AUTO; |
| sigset_t ss; |
| debug_t debug_opt = DEBUG_OFF; |
| debug_t debug_arg = DEBUG_OFF; |
| int clusterwide_opt = 0; |
| mode_t old_mask; |
| int ret = 1; |
| |
| struct option longopts[] = { |
| { "help", 0, 0, 'h' }, |
| { NULL, 0, 0, 0 } |
| }; |
| |
| if (!(lvm_params.excl_uuid = dm_hash_create(128))) { |
| fprintf(stderr, "Failed to allocate hash table\n"); |
| return 1; |
| } |
| |
| /* Deal with command-line arguments */ |
| opterr = 0; |
| optind = 0; |
| while ((opt = getopt_long(argc, argv, "Vhfd:t:RST:CI:E:", |
| longopts, NULL)) != -1) { |
| switch (opt) { |
| case 'h': |
| usage(argv[0], stdout); |
| exit(0); |
| |
| case 'R': |
| check_permissions(); |
| ret = (refresh_clvmd(1) == 1) ? 0 : 1; |
| goto out; |
| |
| case 'S': |
| check_permissions(); |
| ret = (restart_clvmd(clusterwide_opt) == 1) ? 0 : 1; |
| goto out; |
| |
| case 'C': |
| clusterwide_opt = 1; |
| break; |
| |
| case 'd': |
| debug_opt = DEBUG_STDERR; |
| debug_arg = (debug_t) atoi(optarg); |
| if (debug_arg == DEBUG_STDERR) |
| foreground_mode = 1; |
| break; |
| |
| case 'f': |
| foreground_mode = 1; |
| break; |
| case 't': |
| cmd_timeout = atoi(optarg); |
| if (!cmd_timeout) { |
| fprintf(stderr, "command timeout is invalid\n"); |
| usage(argv[0], stderr); |
| exit(1); |
| } |
| break; |
| case 'I': |
| cluster_iface = parse_cluster_interface(optarg); |
| break; |
| case 'E': |
| if (!dm_hash_insert(lvm_params.excl_uuid, optarg, optarg)) { |
| fprintf(stderr, "Failed to allocate hash entry\n"); |
| goto out; |
| } |
| break; |
| case 'T': |
| start_timeout = atoi(optarg); |
| if (start_timeout <= 0) { |
| fprintf(stderr, "startup timeout is invalid\n"); |
| usage(argv[0], stderr); |
| exit(1); |
| } |
| break; |
| |
| case 'V': |
| printf("Cluster LVM daemon version: %s\n", LVM_VERSION); |
| printf("Protocol version: %d.%d.%d\n", |
| CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION, |
| CLVMD_PATCH_VERSION); |
| exit(0); |
| break; |
| |
| default: |
| usage(argv[0], stderr); |
| exit(2); |
| } |
| } |
| |
| check_permissions(); |
| |
| /* |
| * Switch to C locale to avoid reading large locale-archive file |
| * used by some glibc (on some distributions it takes over 100MB). |
| * Daemon currently needs to use mlockall(). |
| */ |
| if (setenv("LC_ALL", "C", 1)) |
| perror("Cannot set LC_ALL to C"); |
| |
| /* Setting debug options on an existing clvmd */ |
| if (debug_opt && !check_local_clvmd()) { |
| dm_hash_destroy(lvm_params.excl_uuid); |
| return debug_clvmd(debug_arg, clusterwide_opt)==1?0:1; |
| } |
| |
| clvmd_set_debug(debug_arg); |
| |
| /* Fork into the background (unless requested not to) */ |
| if (!foreground_mode) |
| be_daemon(start_timeout); |
| |
| (void) dm_prepare_selinux_context(DEFAULT_RUN_DIR, S_IFDIR); |
| old_mask = umask(0077); |
| if (dm_create_dir(DEFAULT_RUN_DIR) == 0) { |
| DEBUGLOG("clvmd: unable to create %s directory\n", |
| DEFAULT_RUN_DIR); |
| umask(old_mask); |
| exit(1); |
| } |
| umask(old_mask); |
| |
| /* Create pidfile */ |
| (void) dm_prepare_selinux_context(CLVMD_PIDFILE, S_IFREG); |
| if (dm_create_lockfile(CLVMD_PIDFILE) == 0) { |
| DEBUGLOG("clvmd: unable to create lockfile\n"); |
| exit(1); |
| } |
| (void) dm_prepare_selinux_context(NULL, 0); |
| |
| atexit(remove_lockfile); |
| |
| DEBUGLOG("CLVMD started\n"); |
| |
| /* Open the Unix socket we listen for commands on. |
| We do this before opening the cluster socket so that |
| potential clients will block rather than error if we are running |
| but the cluster is not ready yet */ |
| local_sock = open_local_sock(); |
| if (local_sock < 0) { |
| child_init_signal_and_exit(DFAIL_LOCAL_SOCK); |
| /* NOTREACHED */ |
| } |
| |
| /* Set up signal handlers, USR1 is for cluster change notifications (in cman) |
| USR2 causes child threads to exit. |
| (HUP used to cause gulm to re-read the nodes list from CCS.) |
| PIPE should be ignored */ |
| signal(SIGUSR2, sigusr2_handler); |
| signal(SIGHUP, sighup_handler); |
| signal(SIGPIPE, SIG_IGN); |
| |
| /* Block SIGUSR2/SIGINT/SIGTERM in process */ |
| sigemptyset(&ss); |
| sigaddset(&ss, SIGUSR2); |
| sigaddset(&ss, SIGINT); |
| sigaddset(&ss, SIGTERM); |
| sigprocmask(SIG_BLOCK, &ss, NULL); |
| |
| /* Initialise the LVM thread variables */ |
| dm_list_init(&lvm_cmd_head); |
| if (pthread_attr_init(&stack_attr) || |
| pthread_attr_setstacksize(&stack_attr, STACK_SIZE)) { |
| log_sys_error("pthread_attr_init", ""); |
| exit(1); |
| } |
| pthread_mutex_init(&lvm_thread_mutex, NULL); |
| pthread_cond_init(&lvm_thread_cond, NULL); |
| pthread_barrier_init(&lvm_start_barrier, NULL, 2); |
| init_lvhash(); |
| |
| /* Start the cluster interface */ |
| if (cluster_iface == IF_AUTO) |
| cluster_iface = get_cluster_type(); |
| |
| #ifdef USE_CMAN |
| if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && |
| (clops = init_cman_cluster())) { |
| max_csid_len = CMAN_MAX_CSID_LEN; |
| max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE; |
| max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN; |
| syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN"); |
| } |
| #endif |
| #ifdef USE_COROSYNC |
| if (!clops) |
| if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && |
| (clops = init_corosync_cluster()))) { |
| max_csid_len = COROSYNC_CSID_LEN; |
| max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE; |
| max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN; |
| syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync"); |
| } |
| #endif |
| #ifdef USE_OPENAIS |
| if (!clops) |
| if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && |
| (clops = init_openais_cluster())) { |
| max_csid_len = OPENAIS_CSID_LEN; |
| max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE; |
| max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN; |
| syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS"); |
| } |
| #endif |
| #ifdef USE_SINGLENODE |
| if (!clops) |
| if (cluster_iface == IF_SINGLENODE && (clops = init_singlenode_cluster())) { |
| max_csid_len = SINGLENODE_CSID_LEN; |
| max_cluster_message = SINGLENODE_MAX_CLUSTER_MESSAGE; |
| max_cluster_member_name_len = MAX_CLUSTER_MEMBER_NAME_LEN; |
| syslog(LOG_NOTICE, "Cluster LVM daemon started - running in single-node mode"); |
| } |
| #endif |
| |
| if (!clops) { |
| DEBUGLOG("Can't initialise cluster interface\n"); |
| log_error("Can't initialise cluster interface."); |
| child_init_signal_and_exit(DFAIL_CLUSTER_IF); |
| /* NOTREACHED */ |
| } |
| DEBUGLOG("Cluster ready, doing some more initialisation\n"); |
| |
| /* Save our CSID */ |
| clops->get_our_csid(our_csid); |
| |
| /* Initialise the FD list head */ |
| local_client_head.fd = clops->get_main_cluster_fd(); |
| local_client_head.type = CLUSTER_MAIN_SOCK; |
| local_client_head.callback = clops->cluster_fd_callback; |
| |
| /* Add the local socket to the list */ |
| if (!(newfd = dm_zalloc(sizeof(struct local_client)))) { |
| child_init_signal_and_exit(DFAIL_MALLOC); |
| /* NOTREACHED */ |
| } |
| |
| newfd->fd = local_sock; |
| newfd->type = LOCAL_RENDEZVOUS; |
| newfd->callback = local_rendezvous_callback; |
| newfd->next = local_client_head.next; |
| local_client_head.next = newfd; |
| |
| /* This needs to be started after cluster initialisation |
| as it may need to take out locks */ |
| DEBUGLOG("Starting LVM thread\n"); |
| DEBUGLOG("Main cluster socket fd %d (%p) with local socket %d (%p)\n", |
| local_client_head.fd, &local_client_head, newfd->fd, newfd); |
| |
| /* Don't let anyone else to do work until we are started */ |
| if (pthread_create(&lvm_thread, &stack_attr, lvm_thread_fn, &lvm_params)) { |
| log_sys_error("pthread_create", ""); |
| goto out; |
| } |
| |
| /* Don't start until the LVM thread is ready */ |
| pthread_barrier_wait(&lvm_start_barrier); |
| |
| /* Tell the rest of the cluster our version number */ |
| if (clops->cluster_init_completed) |
| clops->cluster_init_completed(); |
| |
| DEBUGLOG("clvmd ready for work\n"); |
| child_init_signal(SUCCESS); |
| |
| /* Try to shutdown neatly */ |
| signal(SIGTERM, sigterm_handler); |
| signal(SIGINT, sigterm_handler); |
| |
| /* Do some work */ |
| main_loop(cmd_timeout); |
| |
| pthread_mutex_lock(&lvm_thread_mutex); |
| lvm_thread_exit = 1; |
| pthread_cond_signal(&lvm_thread_cond); |
| pthread_mutex_unlock(&lvm_thread_mutex); |
| if ((errno = pthread_join(lvm_thread, NULL))) |
| log_sys_error("pthread_join", ""); |
| |
| close_local_sock(local_sock); |
| |
| while ((delfd = local_client_head.next)) { |
| local_client_head.next = delfd->next; |
| /* Failing cleanup_zombie leaks... */ |
| if (delfd->type == LOCAL_SOCK && !cleanup_zombie(delfd)) |
| cmd_client_cleanup(delfd); /* calls sync_unlock */ |
| if (delfd->fd != local_sock) |
| safe_close(&(delfd->fd)); |
| dm_free(delfd); |
| } |
| |
| DEBUGLOG("cluster_closedown\n"); |
| destroy_lvhash(); |
| clops->cluster_closedown(); |
| |
| ret = 0; |
| out: |
| dm_hash_destroy(lvm_params.excl_uuid); |
| |
| return ret; |
| } |
| |
| /* Called when the cluster layer has completed initialisation. |
| We send the version message */ |
| void clvmd_cluster_init_completed(void) |
| { |
| send_version_message(); |
| } |
| |
| /* Data on a connected socket */ |
| static int local_sock_callback(struct local_client *thisfd, char *buf, int len, |
| const char *csid, |
| struct local_client **new_client) |
| { |
| *new_client = NULL; |
| return read_from_local_sock(thisfd); |
| } |
| |
| /* Data on a connected socket */ |
| static int local_rendezvous_callback(struct local_client *thisfd, char *buf, |
| int len, const char *csid, |
| struct local_client **new_client) |
| { |
| /* Someone connected to our local socket, accept it. */ |
| |
| struct sockaddr_un socka; |
| struct local_client *newfd; |
| socklen_t sl = sizeof(socka); |
| int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl); |
| |
| if (client_fd == -1 && errno == EINTR) |
| return 1; |
| |
| if (client_fd >= 0) { |
| if (!(newfd = dm_zalloc(sizeof(*newfd)))) { |
| if (close(client_fd)) |
| log_sys_error("close", "socket"); |
| return 1; |
| } |
| |
| pthread_cond_init(&newfd->bits.localsock.cond, NULL); |
| pthread_mutex_init(&newfd->bits.localsock.mutex, NULL); |
| |
| if (fcntl(client_fd, F_SETFD, 1)) |
| DEBUGLOG("Setting CLOEXEC on client fd failed: %s\n", strerror(errno)); |
| |
| newfd->fd = client_fd; |
| newfd->type = LOCAL_SOCK; |
| newfd->callback = local_sock_callback; |
| newfd->bits.localsock.all_success = 1; |
| DEBUGLOG("Got new connection on fd %d (%p)\n", newfd->fd, newfd); |
| *new_client = newfd; |
| } |
| return 1; |
| } |
| |
| static int local_pipe_callback(struct local_client *thisfd, char *buf, |
| int maxlen, const char *csid, |
| struct local_client **new_client) |
| { |
| int len; |
| char buffer[PIPE_BUF]; |
| struct local_client *sock_client = thisfd->bits.pipe.client; |
| int status = -1; /* in error by default */ |
| |
| len = read(thisfd->fd, buffer, sizeof(int)); |
| if (len == -1 && errno == EINTR) |
| return 1; |
| |
| if (len == sizeof(int)) |
| memcpy(&status, buffer, sizeof(int)); |
| |
| DEBUGLOG("Read on pipe %d, %d bytes, status %d\n", |
| thisfd->fd, len, status); |
| |
| /* EOF on pipe or an error, close it */ |
| if (len <= 0) { |
| void *ret = &status; |
| if (close(thisfd->fd)) |
| log_sys_error("close", "local_pipe"); |
| |
| /* Clear out the cross-link */ |
| if (thisfd->bits.pipe.client) |
| thisfd->bits.pipe.client->bits.localsock.pipe_client = NULL; |
| |
| /* Reap child thread */ |
| if (thisfd->bits.pipe.threadid) { |
| if ((errno = pthread_join(thisfd->bits.pipe.threadid, &ret))) |
| log_sys_error("pthread_join", ""); |
| |
| thisfd->bits.pipe.threadid = 0; |
| if (thisfd->bits.pipe.client) |
| thisfd->bits.pipe.client->bits.localsock.threadid = 0; |
| } |
| return -1; |
| } else { |
| DEBUGLOG("Background routine status was %d, sock_client (%p)\n", |
| status, sock_client); |
| /* But has the client gone away ?? */ |
| if (!sock_client) { |
| DEBUGLOG("Got pipe response for dead client, ignoring it\n"); |
| } else { |
| /* If error then just return that code */ |
| if (status) |
| send_local_reply(sock_client, status, |
| sock_client->fd); |
| else { |
| /* FIXME: closer inspect this code since state is write thread protected */ |
| pthread_mutex_lock(&sock_client->bits.localsock.mutex); |
| if (sock_client->bits.localsock.state == POST_COMMAND) { |
| pthread_mutex_unlock(&sock_client->bits.localsock.mutex); |
| send_local_reply(sock_client, 0, |
| sock_client->fd); |
| } else { |
| /* PRE_COMMAND finished. */ |
| pthread_mutex_unlock(&sock_client->bits.localsock.mutex); |
| if ((status = distribute_command(sock_client))) |
| send_local_reply(sock_client, EFBIG, |
| sock_client->fd); |
| } |
| } |
| } |
| } |
| return len; |
| } |
| |
| /* If a noed is up, look for it in the reply array, if it's not there then |
| add one with "ETIMEDOUT". |
| NOTE: This won't race with real replies because they happen in the same thread. |
| */ |
| static void timedout_callback(struct local_client *client, const char *csid, |
| int node_up) |
| { |
| struct node_reply *reply; |
| char nodename[max_cluster_member_name_len]; |
| |
| if (!node_up) |
| return; |
| |
| clops->name_from_csid(csid, nodename); |
| DEBUGLOG("Checking for a reply from %s\n", nodename); |
| pthread_mutex_lock(&client->bits.localsock.mutex); |
| |
| reply = client->bits.localsock.replies; |
| while (reply && strcmp(reply->node, nodename) != 0) |
| reply = reply->next; |
| |
| pthread_mutex_unlock(&client->bits.localsock.mutex); |
| |
| if (!reply) { |
| DEBUGLOG("Node %s timed-out\n", nodename); |
| add_reply_to_list(client, ETIMEDOUT, csid, |
| "Command timed out", 18); |
| } |
| } |
| |
| /* Called when the request has timed out on at least one node. We fill in |
| the remaining node entries with ETIMEDOUT and return. |
| |
| By the time we get here the node that caused |
| the timeout could have gone down, in which case we will never get the expected |
| number of replies that triggers the post command so we need to do it here |
| */ |
| static void request_timed_out(struct local_client *client) |
| { |
| DEBUGLOG("Request timed-out. padding\n"); |
| clops->cluster_do_node_callback(client, timedout_callback); |
| |
| if (!client->bits.localsock.threadid) |
| return; |
| |
| pthread_mutex_lock(&client->bits.localsock.mutex); |
| |
| if (!client->bits.localsock.finished && |
| (client->bits.localsock.num_replies != |
| client->bits.localsock.expected_replies)) { |
| /* Post-process the command */ |
| client->bits.localsock.state = POST_COMMAND; |
| pthread_cond_signal(&client->bits.localsock.cond); |
| } |
| |
| pthread_mutex_unlock(&client->bits.localsock.mutex); |
| } |
| |
| /* This is where the real work happens */ |
| static void main_loop(int cmd_timeout) |
| { |
| sigset_t ss; |
| |
| DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout); |
| |
| sigemptyset(&ss); |
| sigaddset(&ss, SIGINT); |
| sigaddset(&ss, SIGTERM); |
| pthread_sigmask(SIG_UNBLOCK, &ss, NULL); |
| /* Main loop */ |
| while (!quit) { |
| fd_set in; |
| int select_status; |
| struct local_client *thisfd; |
| struct timeval tv = { cmd_timeout, 0 }; |
| int quorate = clops->is_quorate(); |
| int client_count = 0; |
| int max_fd = 0; |
| struct local_client *lastfd = &local_client_head; |
| struct local_client *nextfd = local_client_head.next; |
| |
| /* Wait on the cluster FD and all local sockets/pipes */ |
| local_client_head.fd = clops->get_main_cluster_fd(); |
| FD_ZERO(&in); |
| |
| for (thisfd = &local_client_head; thisfd; thisfd = thisfd->next) { |
| client_count++; |
| max_fd = max(max_fd, thisfd->fd); |
| } |
| |
| if (max_fd > FD_SETSIZE - 32) { |
| fprintf(stderr, "WARNING: There are too many connections to clvmd. Investigate and take action now!\n"); |
| fprintf(stderr, "WARNING: Your cluster may freeze up if the number of clvmd file descriptors (%d) exceeds %d.\n", max_fd + 1, FD_SETSIZE); |
| } |
| |
| for (thisfd = &local_client_head; thisfd; thisfd = nextfd, nextfd = thisfd ? thisfd->next : NULL) { |
| |
| if (thisfd->removeme && !cleanup_zombie(thisfd)) { |
| struct local_client *free_fd = thisfd; |
| lastfd->next = nextfd; |
| DEBUGLOG("removeme set for %p with %d monitored fds remaining\n", free_fd, client_count - 1); |
| |
| /* Queue cleanup, this also frees the client struct */ |
| add_to_lvmqueue(free_fd, NULL, 0, NULL); |
| |
| continue; |
| } |
| |
| lastfd = thisfd; |
| |
| if (thisfd->removeme) |
| continue; |
| |
| /* if the cluster is not quorate then don't listen for new requests */ |
| if ((thisfd->type != LOCAL_RENDEZVOUS && |
| thisfd->type != LOCAL_SOCK) || quorate) |
| if (thisfd->fd < FD_SETSIZE) |
| FD_SET(thisfd->fd, &in); |
| } |
| |
| select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv); |
| |
| if (reread_config) { |
| int saved_errno = errno; |
| |
| reread_config = 0; |
| DEBUGLOG("got SIGHUP\n"); |
| if (clops->reread_config) |
| clops->reread_config(); |
| errno = saved_errno; |
| } |
| |
| if (select_status > 0) { |
| char csid[MAX_CSID_LEN]; |
| char buf[max_cluster_message]; |
| |
| for (thisfd = &local_client_head; thisfd; thisfd = thisfd->next) { |
| if (thisfd->fd < FD_SETSIZE && FD_ISSET(thisfd->fd, &in)) { |
| struct local_client *newfd = NULL; |
| int ret; |
| |
| /* FIXME Remove from main thread in case it blocks! */ |
| /* Do callback */ |
| ret = thisfd->callback(thisfd, buf, sizeof(buf), |
| csid, &newfd); |
| /* Ignore EAGAIN */ |
| if (ret < 0 && (errno == EAGAIN || errno == EINTR)) { |
| continue; |
| } |
| |
| /* Got error or EOF: Remove it from the list safely */ |
| if (ret <= 0) { |
| int type = thisfd->type; |
| |
| /* If the cluster socket shuts down, so do we */ |
| if (type == CLUSTER_MAIN_SOCK || |
| type == CLUSTER_INTERNAL) |
| goto closedown; |
| |
| DEBUGLOG("ret == %d, errno = %d. removing client\n", |
| ret, errno); |
| thisfd->removeme = 1; |
| continue; |
| } |
| |
| /* New client...simply add it to the list */ |
| if (newfd) { |
| newfd->next = thisfd->next; |
| thisfd->next = newfd; |
| thisfd = newfd; |
| } |
| } |
| } |
| } |
| |
| /* Select timed out. Check for clients that have been waiting too long for a response */ |
| if (select_status == 0) { |
| time_t the_time = time(NULL); |
| |
| for (thisfd = &local_client_head; thisfd; thisfd = thisfd->next) { |
| if (thisfd->type == LOCAL_SOCK && |
| thisfd->bits.localsock.sent_out && |
| (thisfd->bits.localsock.sent_time + cmd_timeout) < the_time && |
| thisfd->bits.localsock.expected_replies != |
| thisfd->bits.localsock.num_replies) { |
| /* Send timed out message + replies we already have */ |
| DEBUGLOG("Request timed-out (send: %ld, now: %ld)\n", |
| thisfd->bits.localsock.sent_time, the_time); |
| |
| thisfd->bits.localsock.all_success = 0; |
| |
| request_timed_out(thisfd); |
| } |
| } |
| } |
| if (select_status < 0) { |
| if (errno == EINTR) |
| continue; |
| |
| #ifdef DEBUG |
| perror("select error"); |
| exit(-1); |
| #endif |
| } |
| } |
| |
| closedown: |
| if (quit) |
| DEBUGLOG("SIGTERM received\n"); |
| } |
| |
| static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout) |
| { |
| int child_status; |
| fd_set fds; |
| struct timeval tv = {timeout, 0}; |
| |
| FD_ZERO(&fds); |
| FD_SET(c_pipe, &fds); |
| |
| switch (select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL)) { |
| case 0: |
| fprintf(stderr, "clvmd startup timed out\n"); |
| exit(DFAIL_TIMEOUT); |
| case 1: |
| if (read(c_pipe, &child_status, sizeof(child_status)) != |
| sizeof(child_status)) { |
| fprintf(stderr, "clvmd failed in initialisation\n"); |
| exit(DFAIL_INIT); |
| } |
| |
| switch (child_status) { |
| case SUCCESS: |
| break; |
| case DFAIL_INIT: |
| fprintf(stderr, "clvmd failed in initialisation\n"); |
| break; |
| case DFAIL_LOCAL_SOCK: |
| fprintf(stderr, "clvmd could not create local socket\n"); |
| fprintf(stderr, "Another clvmd is probably already running\n"); |
| break; |
| case DFAIL_CLUSTER_IF: |
| fprintf(stderr, "clvmd could not connect to cluster manager\n"); |
| fprintf(stderr, "Consult syslog for more information\n"); |
| break; |
| case DFAIL_MALLOC: |
| fprintf(stderr, "clvmd failed, not enough memory\n"); |
| break; |
| default: |
| fprintf(stderr, "clvmd failed, error was %d\n", child_status); |
| break; |
| } |
| exit(child_status); |
| default: |
| fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno)); |
| exit(DFAIL_INIT); |
| } |
| } |
| |
| /* |
| * Fork into the background and detach from our parent process. |
| * In the interests of user-friendliness we wait for the daemon |
| * to complete initialisation before returning its status |
| * the the user. |
| */ |
| static void be_daemon(int timeout) |
| { |
| int devnull = open("/dev/null", O_RDWR); |
| if (devnull == -1) { |
| perror("Can't open /dev/null"); |
| exit(3); |
| } |
| |
| if (pipe(child_pipe)) { |
| perror("Error creating pipe"); |
| exit(3); |
| } |
| |
| switch (fork()) { |
| case -1: |
| perror("clvmd: can't fork"); |
| exit(2); |
| |
| case 0: /* Child */ |
| (void) close(child_pipe[0]); |
| break; |
| |
| default: /* Parent */ |
| (void) close(child_pipe[1]); |
| wait_for_child(child_pipe[0], timeout); |
| } |
| |
| /* Detach ourself from the calling environment */ |
| if (close(0) || close(1) || close(2)) { |
| perror("Error closing terminal FDs"); |
| exit(4); |
| } |
| setsid(); |
| |
| if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0 |
| || dup2(devnull, 2) < 0) { |
| perror("Error setting terminal FDs to /dev/null"); |
| log_error("Error setting terminal FDs to /dev/null: %m"); |
| exit(5); |
| } |
| if ((devnull > STDERR_FILENO) && close(devnull)) { |
| log_sys_error("close", "/dev/null"); |
| exit(7); |
| } |
| if (chdir("/")) { |
| log_error("Error setting current directory to /: %m"); |
| exit(6); |
| } |
| } |
| |
| static int verify_message(char *buf, int len) |
| { |
| struct clvm_header *h = (struct clvm_header *)buf; |
| |
| if (len < (int)sizeof(struct clvm_header)) { |
| log_error("verify_message short len %d.", len); |
| return -1; |
| } |
| |
| switch (h->cmd) { |
| case CLVMD_CMD_REPLY: |
| case CLVMD_CMD_VERSION: |
| case CLVMD_CMD_GOAWAY: |
| case CLVMD_CMD_TEST: |
| case CLVMD_CMD_LOCK: |
| case CLVMD_CMD_UNLOCK: |
| case CLVMD_CMD_LOCK_LV: |
| case CLVMD_CMD_LOCK_VG: |
| case CLVMD_CMD_LOCK_QUERY: |
| case CLVMD_CMD_REFRESH: |
| case CLVMD_CMD_GET_CLUSTERNAME: |
| case CLVMD_CMD_SET_DEBUG: |
| case CLVMD_CMD_VG_BACKUP: |
| case CLVMD_CMD_RESTART: |
| case CLVMD_CMD_SYNC_NAMES: |
| break; |
| default: |
| log_error("verify_message bad cmd %x.", h->cmd); |
| return -1; |
| } |
| |
| /* TODO: we may be able to narrow len/flags/clientid/arglen checks based on cmd */ |
| |
| if (h->flags & ~(CLVMD_FLAG_LOCAL | CLVMD_FLAG_SYSTEMLV | CLVMD_FLAG_NODEERRS | CLVMD_FLAG_REMOTE)) { |
| log_error("verify_message bad flags %x.", h->flags); |
| return -1; |
| } |
| |
| if (h->arglen > max_cluster_message) { |
| log_error("verify_message bad arglen %x max %d.", h->arglen, max_cluster_message); |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| static void dump_message(char *buf, int len) |
| { |
| unsigned char row[8]; |
| char str[9]; |
| int i, j = 0; |
| |
| str[8] = '\0'; |
| if (len > 128) |
| len = 128; |
| |
| for (i = 0; i < len; ++i) { |
| row[j] = buf[i]; |
| str[j] = (isprint(buf[i])) ? buf[i] : ' '; |
| |
| if (i + 1 == len) { |
| for (;j < 8; ++j) { |
| row[j] = 0; |
| str[j] = ' '; |
| } |
| |
| log_error("%02x %02x %02x %02x %02x %02x %02x %02x [%s]", |
| row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], str); |
| j = 0; |
| } |
| } |
| } |
| |
| static int cleanup_zombie(struct local_client *thisfd) |
| { |
| int *status; |
| struct local_client *pipe_client; |
| |
| if (thisfd->type != LOCAL_SOCK) |
| return 0; |
| |
| if (!thisfd->bits.localsock.cleanup_needed) |
| return 0; |
| |
| DEBUGLOG("EOF on local socket: inprogress=%d\n", |
| thisfd->bits.localsock.in_progress); |
| |
| if ((pipe_client = thisfd->bits.localsock.pipe_client)) |
| pipe_client = pipe_client->bits.pipe.client; |
| |
| /* If the client went away in mid command then tidy up */ |
| if (thisfd->bits.localsock.in_progress) { |
| DEBUGLOG("Sending SIGUSR2 to pre&post thread (%p in-progress)\n", pipe_client); |
| pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2); |
| if (pthread_mutex_trylock(&thisfd->bits.localsock.mutex)) |
| return 1; |
| thisfd->bits.localsock.state = POST_COMMAND; |
| thisfd->bits.localsock.finished = 1; |
| pthread_cond_signal(&thisfd->bits.localsock.cond); |
| pthread_mutex_unlock(&thisfd->bits.localsock.mutex); |
| |
| /* Free any unsent buffers */ |
| free_reply(thisfd); |
| } |
| |
| /* Kill the subthread & free resources */ |
| if (thisfd->bits.localsock.threadid) { |
| DEBUGLOG("Waiting for pre&post thread (%p)\n", pipe_client); |
| pthread_mutex_lock(&thisfd->bits.localsock.mutex); |
| thisfd->bits.localsock.state = PRE_COMMAND; |
| thisfd->bits.localsock.finished = 1; |
| pthread_cond_signal(&thisfd->bits.localsock.cond); |
| pthread_mutex_unlock(&thisfd->bits.localsock.mutex); |
| |
| if ((errno = pthread_join(thisfd->bits.localsock.threadid, |
| (void **) &status))) |
| log_sys_error("pthread_join", ""); |
| |
| DEBUGLOG("Joined pre&post thread\n"); |
| |
| thisfd->bits.localsock.threadid = 0; |
| |
| /* Remove the pipe client */ |
| if (thisfd->bits.localsock.pipe_client) { |
| struct local_client *delfd; |
| struct local_client *lastfd; |
| |
| (void) close(thisfd->bits.localsock.pipe_client->fd); /* Close pipe */ |
| (void) close(thisfd->bits.localsock.pipe); |
| |
| /* Remove pipe client */ |
| for (lastfd = &local_client_head; (delfd = lastfd->next); lastfd = delfd) |
| if (thisfd->bits.localsock.pipe_client == delfd) { |
| thisfd->bits.localsock.pipe_client = NULL; |
| lastfd->next = delfd->next; |
| dm_free(delfd); |
| break; |
| } |
| } |
| } |
| |
| /* Free the command buffer */ |
| dm_free(thisfd->bits.localsock.cmd); |
| |
| safe_close(&(thisfd->fd)); |
| thisfd->bits.localsock.cleanup_needed = 0; |
| |
| return 0; |
| } |
| |
| /* Called when we have a read from the local socket. |
| was in the main loop but it's grown up and is a big girl now */ |
| static int read_from_local_sock(struct local_client *thisfd) |
| { |
| int len; |
| int argslen; |
| int missing_len; |
| char buffer[PIPE_BUF + 1]; |
| char csid[MAX_CSID_LEN]; |
| int comms_pipe[2]; |
| struct local_client *newfd; |
| struct clvm_header *inheader = (struct clvm_header *) buffer; |
| int status; |
| |
| len = read(thisfd->fd, buffer, sizeof(buffer) - 1); |
| if (len == -1 && errno == EINTR) |
| return 1; |
| |
| DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len); |
| |
| if (len && verify_message(buffer, len) < 0) { |
| log_error("read_from_local_sock from %d len %d bad verify.", |
| thisfd->fd, len); |
| dump_message(buffer, len); |
| /* force error handling below */ |
| len = 0; |
| } |
| |
| /* EOF or error on socket */ |
| if (len <= 0) { |
| thisfd->bits.localsock.cleanup_needed = 1; |
| (void) cleanup_zombie(thisfd); /* ignore errors here */ |
| return 0; |
| } |
| |
| buffer[len] = 0; /* Ensure \0 terminated */ |
| |
| /* Fill in the client ID */ |
| inheader->clientid = htonl(thisfd->fd); |
| |
| /* If we are already busy then return an error */ |
| if (thisfd->bits.localsock.in_progress) { |
| struct clvm_header reply = { |
| .cmd = CLVMD_CMD_REPLY, |
| .status = EBUSY |
| }; |
| send_message(&reply, sizeof(reply), our_csid, thisfd->fd, |
| "Error sending EBUSY reply to local user"); |
| return len; |
| } |
| |
| /* See if we have the whole message */ |
| argslen = len - strlen(inheader->node) - sizeof(struct clvm_header); |
| missing_len = inheader->arglen - argslen; |
| |
| if (missing_len < 0) |
| missing_len = 0; |
| |
| /* We need at least sizeof(struct clvm_header) bytes in buffer */ |
| if (len < (int)sizeof(struct clvm_header) || /* Already handled in verify_message() */ |
| argslen < 0 || missing_len > MAX_MISSING_LEN) { |
| struct clvm_header reply = { |
| .cmd = CLVMD_CMD_REPLY, |
| .status = EINVAL |
| }; |
| send_message(&reply, sizeof(reply), our_csid, thisfd->fd, |
| "Error sending EINVAL reply to local user"); |
| return 0; |
| } |
| |
| /* Free any old buffer space */ |
| dm_free(thisfd->bits.localsock.cmd); |
| |
| /* Save the message */ |
| if (!(thisfd->bits.localsock.cmd = dm_malloc(len + missing_len))) { |
| struct clvm_header reply = { |
| .cmd = CLVMD_CMD_REPLY, |
| .status = ENOMEM |
| }; |
| send_message(&reply, sizeof(reply), our_csid, thisfd->fd, |
| "Error sending ENOMEM reply to local user"); |
| return 0; |
| } |
| memcpy(thisfd->bits.localsock.cmd, buffer, len); |
| thisfd->bits.localsock.cmd_len = len + missing_len; |
| inheader = (struct clvm_header *) thisfd->bits.localsock.cmd; |
| |
| /* If we don't have the full message then read the rest now */ |
| if (missing_len) { |
| char *argptr = inheader->node + strlen(inheader->node) + 1; |
| |
| while (missing_len > 0) { |
| DEBUGLOG("got %d bytes, need another %d (total %d)\n", |
| argslen, missing_len, inheader->arglen); |
| len = read(thisfd->fd, argptr + argslen, missing_len); |
| if (len == -1 && errno == EINTR) |
| continue; |
| |
| if (len <= 0) { |
| /* EOF or error on socket */ |
| DEBUGLOG("EOF on local socket\n"); |
| dm_free(thisfd->bits.localsock.cmd); |
| thisfd->bits.localsock.cmd = NULL; |
| return 0; |
| } |
| |
| missing_len -= len; |
| argslen += len; |
| } |
| } |
| |
| /* Only run the command if all the cluster nodes are running CLVMD */ |
| if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) && |
| (check_all_clvmds_running(thisfd) == -1)) { |
| thisfd->bits.localsock.expected_replies = 0; |
| thisfd->bits.localsock.num_replies = 0; |
| send_local_reply(thisfd, EHOSTDOWN, thisfd->fd); |
| return len; |
| } |
| |
| /* Check the node name for validity */ |
| if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) { |
| /* Error, node is not in the cluster */ |
| struct clvm_header reply = { |
| .cmd = CLVMD_CMD_REPLY, |
| .status = ENOENT |
| }; |
| |
| DEBUGLOG("Unknown node: '%s'\n", inheader->node); |
| send_message(&reply, sizeof(reply), our_csid, thisfd->fd, |
| "Error sending ENOENT reply to local user"); |
| thisfd->bits.localsock.expected_replies = 0; |
| thisfd->bits.localsock.num_replies = 0; |
| thisfd->bits.localsock.in_progress = FALSE; |
| thisfd->bits.localsock.sent_out = FALSE; |
| return len; |
| } |
| |
| /* If we already have a subthread then just signal it to start */ |
| if (thisfd->bits.localsock.threadid) { |
| pthread_mutex_lock(&thisfd->bits.localsock.mutex); |
| thisfd->bits.localsock.state = PRE_COMMAND; |
| pthread_cond_signal(&thisfd->bits.localsock.cond); |
| pthread_mutex_unlock(&thisfd->bits.localsock.mutex); |
| return len; |
| } |
| |
| /* Create a pipe and add the reading end to our FD list */ |
| if (pipe(comms_pipe)) { |
| struct clvm_header reply = { |
| .cmd = CLVMD_CMD_REPLY, |
| .status = EBUSY |
| }; |
| |
| DEBUGLOG("Creating pipe failed: %s\n", strerror(errno)); |
| send_message(&reply, sizeof(reply), our_csid, thisfd->fd, |
| "Error sending EBUSY reply to local user"); |
| return len; |
| } |
| |
| if (!(newfd = dm_zalloc(sizeof(*newfd)))) { |
| struct clvm_header reply = { |
| .cmd = CLVMD_CMD_REPLY, |
| .status = ENOMEM |
| }; |
| |
| (void) close(comms_pipe[0]); |
| (void) close(comms_pipe[1]); |
| |
| send_message(&reply, sizeof(reply), our_csid, thisfd->fd, |
| "Error sending ENOMEM reply to local user"); |
| return len; |
| } |
| |
| DEBUGLOG("Creating pipe, [%d, %d]\n", comms_pipe[0], comms_pipe[1]); |
| |
| if (fcntl(comms_pipe[0], F_SETFD, 1)) |
| DEBUGLOG("setting CLOEXEC on pipe[0] failed: %s\n", strerror(errno)); |
| if (fcntl(comms_pipe[1], F_SETFD, 1)) |
| DEBUGLOG("setting CLOEXEC on pipe[1] failed: %s\n", strerror(errno)); |
| |
| newfd->fd = comms_pipe[0]; |
| newfd->type = THREAD_PIPE; |
| newfd->callback = local_pipe_callback; |
| newfd->bits.pipe.client = thisfd; |
| newfd->next = thisfd->next; |
| thisfd->next = newfd; |
| |
| /* Store a cross link to the pipe */ |
| thisfd->bits.localsock.pipe_client = newfd; |
| thisfd->bits.localsock.pipe = comms_pipe[1]; |
| |
| /* Make sure the thread has a copy of it's own ID */ |
| newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid; |
| |
| /* Run the pre routine */ |
| thisfd->bits.localsock.in_progress = TRUE; |
| thisfd->bits.localsock.state = PRE_COMMAND; |
| thisfd->bits.localsock.cleanup_needed = 1; |
| DEBUGLOG("Creating pre&post thread for pipe fd %d (%p)\n", newfd->fd, newfd); |
| status = pthread_create(&thisfd->bits.localsock.threadid, |
| &stack_attr, pre_and_post_thread, thisfd); |
| DEBUGLOG("Created pre&post thread, state = %d\n", status); |
| |
| return len; |
| } |
| |
| /* Add a file descriptor from the cluster or comms interface to |
| our list of FDs for select |
| */ |
| int add_client(struct local_client *new_client) |
| { |
| new_client->next = local_client_head.next; |
| local_client_head.next = new_client; |
| |
| return 0; |
| } |
| |
| /* Called when the pre-command has completed successfully - we |
| now execute the real command on all the requested nodes */ |
| static int distribute_command(struct local_client *thisfd) |
| { |
| struct clvm_header *inheader = |
| (struct clvm_header *) thisfd->bits.localsock.cmd; |
| int len = thisfd->bits.localsock.cmd_len; |
| |
| thisfd->xid = global_xid++; |
| DEBUGLOG("distribute command: XID = %d, flags=0x%x (%s%s)\n", |
| thisfd->xid, inheader->flags, |
| (inheader->flags & CLVMD_FLAG_LOCAL) ? "LOCAL" : "", |
| (inheader->flags & CLVMD_FLAG_REMOTE) ? "REMOTE" : ""); |
| |
| /* Forward it to other nodes in the cluster if needed */ |
| if (!(inheader->flags & CLVMD_FLAG_LOCAL)) { |
| /* if node is empty then do it on the whole cluster */ |
| if (inheader->node[0] == '\0') { |
| thisfd->bits.localsock.expected_replies = |
| clops->get_num_nodes(); |
| thisfd->bits.localsock.num_replies = 0; |
| thisfd->bits.localsock.sent_time = time(NULL); |
| thisfd->bits.localsock.in_progress = TRUE; |
| thisfd->bits.localsock.sent_out = TRUE; |
| |
| /* |
| * Send to local node first, even if CLVMD_FLAG_REMOTE |
| * is set so we still get a reply if this is the |
| * only node. |
| */ |
| add_to_lvmqueue(thisfd, inheader, len, NULL); |
| |
| DEBUGLOG("Sending message to all cluster nodes\n"); |
| inheader->xid = thisfd->xid; |
| send_message(inheader, len, NULL, -1, |
| "Error forwarding message to cluster"); |
| } else { |
| /* Do it on a single node */ |
| char csid[MAX_CSID_LEN]; |
| |
| if (clops->csid_from_name(csid, inheader->node)) |
| /* This has already been checked so should not happen */ |
| return 0; |
| |
| /* OK, found a node... */ |
| thisfd->bits.localsock.in_progress = TRUE; |
| thisfd->bits.localsock.expected_replies = 1; |
| thisfd->bits.localsock.num_replies = 0; |
| |
| /* Are we the requested node ?? */ |
| if (memcmp(csid, our_csid, max_csid_len) == 0) { |
| DEBUGLOG("Doing command on local node only\n"); |
| add_to_lvmqueue(thisfd, inheader, len, NULL); |
| } else { |
| DEBUGLOG("Sending message to single node: %s\n", |
| inheader->node); |
| inheader->xid = thisfd->xid; |
| send_message(inheader, len, csid, -1, |
| "Error forwarding message to cluster node"); |
| } |
| } |
| } else { |
| /* Local explicitly requested, ignore nodes */ |
| thisfd->bits.localsock.in_progress = TRUE; |
| thisfd->bits.localsock.expected_replies = 1; |
| thisfd->bits.localsock.num_replies = 0; |
| DEBUGLOG("Doing command explicitly on local node only\n"); |
| add_to_lvmqueue(thisfd, inheader, len, NULL); |
| } |
| |
| return 0; |
| } |
| |
| /* Process a command from a remote node and return the result */ |
| static void process_remote_command(struct clvm_header *msg, int msglen, int fd, |
| const char *csid) |
| { |
| char *replyargs; |
| char nodename[max_cluster_member_name_len]; |
| int replylen = 0; |
| int buflen = max_cluster_message - sizeof(struct clvm_header) - 1; |
| int status; |
| |
| /* Get the node name as we /may/ need it later */ |
| clops->name_from_csid(csid, nodename); |
| |
| DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n", |
| decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename); |
| |
| /* Check for GOAWAY and sulk */ |
| if (msg->cmd == CLVMD_CMD_GOAWAY) { |
| DEBUGLOG("Told to go away by %s\n", nodename); |
| log_error("Told to go away by %s.", nodename); |
| exit(99); |
| } |
| |
| /* Version check is internal - don't bother exposing it in clvmd-command.c */ |
| if (msg->cmd == CLVMD_CMD_VERSION) { |
| int version_nums[3]; |
| char node[256]; |
| |
| memcpy(version_nums, msg->args, sizeof(version_nums)); |
| |
| clops->name_from_csid(csid, node); |
| DEBUGLOG("Remote node %s is version %d.%d.%d\n", |
| node, ntohl(version_nums[0]), |
| ntohl(version_nums[1]), ntohl(version_nums[2])); |
| |
| if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) { |
| struct clvm_header byebyemsg = { |
| .cmd = CLVMD_CMD_GOAWAY |
| }; |
| |
| DEBUGLOG("Telling node %s to go away because of incompatible version number\n", |
| node); |
| log_notice("Telling node %s to go away because of incompatible version number %d.%d.%d\n", |
| node, ntohl(version_nums[0]), |
| ntohl(version_nums[1]), ntohl(version_nums[2])); |
| |
| clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg), our_csid, |
| "Error Sending GOAWAY message"); |
| } else |
| clops->add_up_node(csid); |
| |
| return; |
| } |
| |
| /* Allocate a default reply buffer */ |
| if ((replyargs = dm_malloc(max_cluster_message - sizeof(struct clvm_header)))) |
| /* Run the command */ |
| /* FIXME: usage of init_test() is unprotected */ |
| status = do_command(NULL, msg, msglen, &replyargs, |
| buflen, &replylen); |
| else |
| status = ENOMEM; |
| |
| /* If it wasn't a reply, then reply */ |
| if (msg->cmd != CLVMD_CMD_REPLY) { |
| char *aggreply; |
| |
| aggreply = dm_realloc(replyargs, replylen + sizeof(struct clvm_header)); |
| if (aggreply) { |
| struct clvm_header *agghead = |
| (struct clvm_header *) aggreply; |
| |
| replyargs = aggreply; |
| /* Move it up so there's room for a header in front of the data */ |
| memmove(aggreply + offsetof(struct clvm_header, args), |
| replyargs, replylen); |
| |
| agghead->xid = msg->xid; |
| agghead->cmd = CLVMD_CMD_REPLY; |
| agghead->status = status; |
| agghead->flags = 0; |
| agghead->clientid = msg->clientid; |
| agghead->arglen = replylen; |
| agghead->node[0] = '\0'; |
| send_message(aggreply, sizeof(struct clvm_header) + replylen, |
| csid, fd, "Error sending command reply"); |
| } else { |
| /* Return a failure response */ |
| struct clvm_header reply = { |
| .cmd = CLVMD_CMD_REPLY, |
| .status = ENOMEM, |
| .clientid = msg->clientid |
| }; |
| DEBUGLOG("Error attempting to realloc return buffer\n"); |
| send_message(&reply, sizeof(reply), csid, fd, |
| "Error sending ENOMEM command reply"); |
| } |
| } |
| |
| dm_free(replyargs); |
| } |
| |
| /* Add a reply to a command to the list of replies for this client. |
| If we have got a full set then send them to the waiting client down the local |
| socket */ |
| static void add_reply_to_list(struct local_client *client, int status, |
| const char *csid, const char *buf, int len) |
| { |
| struct node_reply *reply; |
| |
| /* Add it to the list of replies */ |
| if (!(reply = dm_zalloc(sizeof(*reply)))) { |
| /* It's all gone horribly wrong... */ |
| send_local_reply(client, ENOMEM, client->fd); |
| return; |
| } |
| |
| reply->status = status; |
| clops->name_from_csid(csid, reply->node); |
| DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len); |
| |
| if (len > 0) { |
| if (!(reply->replymsg = dm_malloc(len))) |
| reply->status = ENOMEM; |
| else |
| memcpy(reply->replymsg, buf, len); |
| } else |
| reply->replymsg = NULL; |
| |
| pthread_mutex_lock(&client->bits.localsock.mutex); |
| |
| if (client->bits.localsock.finished) { |
| dm_free(reply->replymsg); |
| dm_free(reply); |
| } else { |
| /* Hook it onto the reply chain */ |
| reply->next = client->bits.localsock.replies; |
| client->bits.localsock.replies = reply; |
| |
| /* If we have the whole lot then do the post-process */ |
| /* Post-process the command */ |
| if (++client->bits.localsock.num_replies == |
| client->bits.localsock.expected_replies) { |
| client->bits.localsock.state = POST_COMMAND; |
| pthread_cond_signal(&client->bits.localsock.cond); |
| } |
| DEBUGLOG("Got %d replies, expecting: %d\n", |
| client->bits.localsock.num_replies, |
| client->bits.localsock.expected_replies); |
| } |
| pthread_mutex_unlock(&client->bits.localsock.mutex); |
| } |
| |
| /* This is the thread that runs the PRE and post commands for a particular connection */ |
| static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg) |
| { |
| struct local_client *client = (struct local_client *) arg; |
| int status; |
| int write_status; |
| sigset_t ss; |
| int pipe_fd = client->bits.localsock.pipe; |
| |
| DEBUGLOG("Pre&post thread (%p), pipe fd %d\n", client, pipe_fd); |
| pthread_mutex_lock(&client->bits.localsock.mutex); |
| |
| /* Ignore SIGUSR1 (handled by master process) but enable |
| SIGUSR2 (kills subthreads) */ |
| sigemptyset(&ss); |
| sigaddset(&ss, SIGUSR1); |
| pthread_sigmask(SIG_BLOCK, &ss, NULL); |
| |
| sigdelset(&ss, SIGUSR1); |
| sigaddset(&ss, SIGUSR2); |
| pthread_sigmask(SIG_UNBLOCK, &ss, NULL); |
| |
| /* Loop around doing PRE and POST functions until the client goes away */ |
| while (!client->bits.localsock.finished) { |
| /* Execute the code */ |
| /* FIXME: usage of init_test() is unprotected as in do_command() */ |
| if ((status = do_pre_command(client))) |
| client->bits.localsock.all_success = 0; |
| |
| DEBUGLOG("Pre&post thread (%p) writes status %d down to pipe fd %d\n", |
| client, status, pipe_fd); |
| |
| /* Tell the parent process we have finished this bit */ |
| while ((write_status = write(pipe_fd, &status, sizeof(int))) != sizeof(int)) |
| if (write_status >=0 || (errno != EINTR && errno != EAGAIN)) { |
| log_error("Error sending to pipe: %m"); |
| break; |
| } |
| |
| if (status) { |
| client->bits.localsock.state = POST_COMMAND; |
| goto next_pre; |
| } |
| |
| /* We may need to wait for the condition variable before running the post command */ |
| if (client->bits.localsock.state != POST_COMMAND && |
| !client->bits.localsock.finished) { |
| DEBUGLOG("Pre&post thread (%p) waiting to do post command, state = %d\n", |
| client, client->bits.localsock.state); |
| pthread_cond_wait(&client->bits.localsock.cond, |
| &client->bits.localsock.mutex); |
| } |
| |
| DEBUGLOG("Pre&post thread (%p) got post command condition...\n", client); |
| |
| /* POST function must always run, even if the client aborts */ |
| status = 0; |
| do_post_command(client); |
| |
| while ((write_status = write(pipe_fd, &status, sizeof(int))) != sizeof(int)) |
| if (write_status >=0 || (errno != EINTR && errno != EAGAIN)) { |
| log_error("Error sending to pipe: %m"); |
| break; |
| } |
| next_pre: |
| if (client->bits.localsock.state != PRE_COMMAND && |
| !client->bits.localsock.finished) { |
| DEBUGLOG("Pre&post thread (%p) waiting for next pre command\n", client); |
| pthread_cond_wait(&client->bits.localsock.cond, |
| &client->bits.localsock.mutex); |
| } |
| |
| DEBUGLOG("Pre&post thread (%p) got pre command condition...\n", client); |
| } |
| pthread_mutex_unlock(&client->bits.localsock.mutex); |
| DEBUGLOG("Pre&post thread (%p) finished\n", client); |
| |
| pthread_exit(NULL); |
| } |
| |
| /* Process a command on the local node and store the result */ |
| static int process_local_command(struct clvm_header *msg, int msglen, |
| struct local_client *client, |
| unsigned short xid) |
| { |
| char *replybuf; |
| int buflen = max_cluster_message - sizeof(struct clvm_header) - 1; |
| int replylen = 0; |
| int status; |
| |
| if (!(replybuf = dm_malloc(max_cluster_message))) |
| return -1; |
| |
| DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n", |
| decode_cmd(msg->cmd), msg, msglen, client); |
| |
| /* If remote flag is set, just set a successful status code. */ |
| if (msg->flags & CLVMD_FLAG_REMOTE) |
| status = 0; |
| else |
| status = do_command(client, msg, msglen, &replybuf, buflen, &replylen); |
| |
| if (status) |
| client->bits.localsock.all_success = 0; |
| |
| /* If we took too long then discard the reply */ |
| if (xid == client->xid) |
| add_reply_to_list(client, status, our_csid, replybuf, replylen); |
| else |
| DEBUGLOG("Local command took too long, discarding xid %d, current is %d\n", |
| xid, client->xid); |
| |
| dm_free(replybuf); |
| |
| return status; |
| } |
| |
| static int process_reply(const struct clvm_header *msg, int msglen, const char *csid) |
| { |
| struct local_client *client; |
| |
| if (!(client = find_client(msg->clientid))) { |
| DEBUGLOG("Got message for unknown client 0x%x\n", |
| msg->clientid); |
| log_error("Got message for unknown client 0x%x.", |
| msg->clientid); |
| return -1; |
| } |
| |
| if (msg->status) |
| client->bits.localsock.all_success = 0; |
| |
| /* Gather replies together for this client id */ |
| if (msg->xid == client->xid) |
| add_reply_to_list(client, msg->status, csid, msg->args, |
| msg->arglen); |
| else |
| DEBUGLOG("Discarding reply with old XID %d, current = %d\n", |
| msg->xid, client->xid); |
| |
| return 0; |
| } |
| |
| /* Send an aggregated reply back to the client */ |
| static void send_local_reply(struct local_client *client, int status, int fd) |
| { |
| struct clvm_header *clientreply; |
| struct node_reply *thisreply = client->bits.localsock.replies; |
| char *replybuf; |
| char *ptr; |
| int message_len = 0; |
| |
| DEBUGLOG("Send local reply\n"); |
| |
| /* Work out the total size of the reply */ |
| while (thisreply) { |
| if (thisreply->replymsg) |
| message_len += strlen(thisreply->replymsg) + 1; |
| else |
| message_len++; |
| |
| message_len += strlen(thisreply->node) + 1 + sizeof(int); |
| |
| thisreply = thisreply->next; |
| } |
| |
| /* Add in the size of our header */ |
| message_len = message_len + sizeof(struct clvm_header); |
| if (!(replybuf = dm_malloc(message_len))) { |
| DEBUGLOG("Memory allocation fails\n"); |
| return; |
| } |
| |
| clientreply = (struct clvm_header *) replybuf; |
| clientreply->status = status; |
| clientreply->cmd = CLVMD_CMD_REPLY; |
| clientreply->node[0] = '\0'; |
| clientreply->xid = 0; |
| clientreply->clientid = 0; |
| clientreply->flags = 0; |
| |
| ptr = clientreply->args; |
| |
| /* Add in all the replies, and free them as we go */ |
| thisreply = client->bits.localsock.replies; |
| while (thisreply) { |
| struct node_reply *tempreply = thisreply; |
| |
| strcpy(ptr, thisreply->node); |
| ptr += strlen(thisreply->node) + 1; |
| |
| if (thisreply->status) |
| clientreply->flags |= CLVMD_FLAG_NODEERRS; |
| |
| memcpy(ptr, &thisreply->status, sizeof(int)); |
| ptr += sizeof(int); |
| |
| if (thisreply->replymsg) { |
| strcpy(ptr, thisreply->replymsg); |
| ptr += strlen(thisreply->replymsg) + 1; |
| } else { |
| ptr[0] = '\0'; |
| ptr++; |
| } |
| thisreply = thisreply->next; |
| |
| dm_free(tempreply->replymsg); |
| dm_free(tempreply); |
| } |
| |
| /* Terminate with an empty node name */ |
| *ptr = '\0'; |
| |
| clientreply->arglen = ptr - clientreply->args; |
| |
| /* And send it */ |
| send_message(replybuf, message_len, our_csid, fd, |
| "Error sending REPLY to client"); |
| dm_free(replybuf); |
| |
| /* Reset comms variables */ |
| client->bits.localsock.replies = NULL; |
| client->bits.localsock.expected_replies = 0; |
| client->bits.localsock.in_progress = FALSE; |
| client->bits.localsock.sent_out = FALSE; |
| } |
| |
| /* Just free a reply chain baceuse it wasn't used. */ |
| static void free_reply(struct local_client *client) |
| { |
| /* Add in all the replies, and free them as we go */ |
| struct node_reply *thisreply = client->bits.localsock.replies; |
| while (thisreply) { |
| struct node_reply *tempreply = thisreply; |
| |
| thisreply = thisreply->next; |
| |
| dm_free(tempreply->replymsg); |
| dm_free(tempreply); |
| } |
| client->bits.localsock.replies = NULL; |
| } |
| |
| /* Send our version number to the cluster */ |
| static void send_version_message(void) |
| { |
| char message[sizeof(struct clvm_header) + sizeof(int) * 3]; |
| struct clvm_header *msg = (struct clvm_header *) message; |
| int version_nums[3] = { |
| htonl(CLVMD_MAJOR_VERSION), |
| htonl(CLVMD_MINOR_VERSION), |
| htonl(CLVMD_PATCH_VERSION) |
| }; |
| |
| msg->cmd = CLVMD_CMD_VERSION; |
| msg->status = 0; |
| msg->flags = 0; |
| msg->clientid = 0; |
| msg->arglen = sizeof(version_nums); |
| |
| memcpy(&msg->args, version_nums, sizeof(version_nums)); |
| |
| hton_clvm(msg); |
| |
| clops->cluster_send_message(message, sizeof(message), NULL, |
| "Error Sending version number"); |
| } |
| |
| /* Send a message to either a local client or another server */ |
| static int send_message(void *buf, int msglen, const char *csid, int fd, |
| const char *errtext) |
| { |
| int len = 0; |
| int ptr; |
| struct timespec delay; |
| struct timespec remtime; |
| int retry_cnt = 0; |
| |
| /* Send remote messages down the cluster socket */ |
| if (!csid || !ISLOCAL_CSID(csid)) { |
| hton_clvm((struct clvm_header *) buf); |
| return clops->cluster_send_message(buf, msglen, csid, errtext); |
| } |
| |
| /* Make sure it all goes */ |
| for (ptr = 0; ptr < msglen;) { |
| if ((len = write(fd, (char*)buf + ptr, msglen - ptr)) <= 0) { |
| if (errno == EINTR) |
| continue; |
| if ((errno == EAGAIN || errno == EIO || errno == ENOSPC) && |
| ++retry_cnt < MAX_RETRIES) { |
| delay.tv_sec = 0; |
| delay.tv_nsec = 100000; |
| remtime.tv_sec = 0; |
| remtime.tv_nsec = 0; |
| (void) nanosleep (&delay, &remtime); |
| continue; |
| } |
| log_error("%s", errtext); |
| break; |
| } |
| ptr += len; |
| } |
| |
| return len; |
| } |
| |
| static int process_work_item(struct lvm_thread_cmd *cmd) |
| { |
| /* If msg is NULL then this is a cleanup request */ |
| if (cmd->msg == NULL) { |
| DEBUGLOG("process_work_item: free %p\n", cmd->client); |
| cmd_client_cleanup(cmd->client); |
| pthread_mutex_destroy(&cmd->client->bits.localsock.mutex); |
| pthread_cond_destroy(&cmd->client->bits.localsock.cond); |
| dm_free(cmd->client); |
| return 0; |
| } |
| |
| if (!cmd->remote) { |
| DEBUGLOG("process_work_item: local\n"); |
| process_local_command(cmd->msg, cmd->msglen, cmd->client, |
| cmd->xid); |
| } else { |
| DEBUGLOG("process_work_item: remote\n"); |
| process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd, |
| cmd->csid); |
| } |
| |
| return 0; |
| } |
| |
| /* |
| * Routine that runs in the "LVM thread". |
| */ |
| static void *lvm_thread_fn(void *arg) |
| { |
| sigset_t ss; |
| struct lvm_startup_params *lvm_params = arg; |
| struct lvm_thread_cmd *cmd; |
| |
| DEBUGLOG("LVM thread function started\n"); |
| |
| /* Ignore SIGUSR1 & 2 */ |
| sigemptyset(&ss); |
| sigaddset(&ss, SIGUSR1); |
| sigaddset(&ss, SIGUSR2); |
| pthread_sigmask(SIG_BLOCK, &ss, NULL); |
| |
| /* Initialise the interface to liblvm */ |
| init_clvm(lvm_params->excl_uuid); |
| |
| /* Allow others to get moving */ |
| pthread_barrier_wait(&lvm_start_barrier); |
| DEBUGLOG("LVM thread ready for work.\n"); |
| |
| /* Now wait for some actual work */ |
| pthread_mutex_lock(&lvm_thread_mutex); |
| |
| for (;;) { |
| while (!dm_list_empty(&lvm_cmd_head)) { |
| cmd = dm_list_item(dm_list_first(&lvm_cmd_head), |
| struct lvm_thread_cmd); |
| dm_list_del(&cmd->list); |
| pthread_mutex_unlock(&lvm_thread_mutex); |
| |
| process_work_item(cmd); |
| dm_free(cmd->msg); |
| dm_free(cmd); |
| |
| pthread_mutex_lock(&lvm_thread_mutex); |
| } |
| |
| if (lvm_thread_exit) |
| break; |
| |
| DEBUGLOG("LVM thread waiting for work\n"); |
| pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex); |
| } |
| |
| pthread_mutex_unlock(&lvm_thread_mutex); |
| DEBUGLOG("LVM thread exits\n"); |
| |
| destroy_lvm(); |
| |
| pthread_exit(NULL); |
| } |
| |
| /* Pass down some work to the LVM thread */ |
| static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg, |
| int msglen, const char *csid) |
| { |
| struct lvm_thread_cmd *cmd; |
| |
| if (!(cmd = dm_malloc(sizeof(*cmd)))) |
| return ENOMEM; |
| |
| if (msglen) { |
| if (!(cmd->msg = dm_malloc(msglen))) { |
| log_error("Unable to allocate buffer space."); |
| dm_free(cmd); |
| return -1; |
| } |
| memcpy(cmd->msg, msg, msglen); |
| } |
| else |
| cmd->msg = NULL; |
| |
| cmd->client = client; |
| cmd->msglen = msglen; |
| cmd->xid = client->xid; |
| |
| if (csid) { |
| memcpy(cmd->csid, csid, max_csid_len); |
| cmd->remote = 1; |
| } else |
| cmd->remote = 0; |
| |
| DEBUGLOG("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n", |
| cmd, client, msg, msglen, csid, cmd->xid); |
| pthread_mutex_lock(&lvm_thread_mutex); |
| if (lvm_thread_exit) { |
| pthread_mutex_unlock(&lvm_thread_mutex); |
| dm_free(cmd->msg); |
| dm_free(cmd); |
| return -1; /* We are about to exit */ |
| } |
| dm_list_add(&lvm_cmd_head, &cmd->list); |
| pthread_cond_signal(&lvm_thread_cond); |
| pthread_mutex_unlock(&lvm_thread_mutex); |
| |
| return 0; |
| } |
| |
| /* Return 0 if we can talk to an existing clvmd */ |
| static int check_local_clvmd(void) |
| { |
| int local_socket; |
| int ret = 0; |
| struct sockaddr_un sockaddr = { .sun_family = AF_UNIX }; |
| |
| if (!dm_strncpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(sockaddr.sun_path))) { |
| log_error("%s: clvmd socket name too long.", CLVMD_SOCKNAME); |
| return -1; |
| } |
| |
| /* Open local socket */ |
| if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { |
| log_sys_error("socket", "local socket"); |
| return -1; |
| } |
| |
| if (connect(local_socket,(struct sockaddr *) &sockaddr, |
| sizeof(sockaddr))) { |
| log_sys_error("connect", "local socket"); |
| ret = -1; |
| } |
| |
| if (close(local_socket)) |
| log_sys_error("close", "local socket"); |
| |
| return ret; |
| } |
| |
| static void close_local_sock(int local_socket) |
| { |
| if (local_socket != -1 && close(local_socket)) |
| log_sys_error("close", CLVMD_SOCKNAME); |
| |
| if (CLVMD_SOCKNAME[0] != '\0' && unlink(CLVMD_SOCKNAME)) |
| stack; |
| } |
| |
| /* Open the local socket, that's the one we talk to libclvm down */ |
| static int open_local_sock(void) |
| { |
| mode_t old_mask; |
| int local_socket = -1; |
| struct sockaddr_un sockaddr = { .sun_family = AF_UNIX }; |
| |
| if (!dm_strncpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(sockaddr.sun_path))) { |
| log_error("%s: clvmd socket name too long.", CLVMD_SOCKNAME); |
| return -1; |
| } |
| |
| close_local_sock(local_socket); |
| |
| (void) dm_prepare_selinux_context(CLVMD_SOCKNAME, S_IFSOCK); |
| old_mask = umask(0077); |
| |
| /* Open local socket */ |
| if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { |
| log_error("Can't create local socket: %m"); |
| goto error; |
| } |
| |
| /* Set Close-on-exec & non-blocking */ |
| if (fcntl(local_socket, F_SETFD, 1)) |
| DEBUGLOG("setting CLOEXEC on local_socket failed: %s\n", strerror(errno)); |
| if (fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK)) |
| DEBUGLOG("setting O_NONBLOCK on local_socket failed: %s\n", strerror(errno)); |
| |
| |
| if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) { |
| log_error("can't bind local socket: %m"); |
| goto error; |
| } |
| if (listen(local_socket, 1) != 0) { |
| log_error("listen local: %m"); |
| goto error; |
| } |
| |
| umask(old_mask); |
| (void) dm_prepare_selinux_context(NULL, 0); |
| return local_socket; |
| error: |
| close_local_sock(local_socket); |
| umask(old_mask); |
| (void) dm_prepare_selinux_context(NULL, 0); |
| return -1; |
| } |
| |
| void process_message(struct local_client *client, char *buf, int len, |
| const char *csid) |
| { |
| char nodename[max_cluster_member_name_len]; |
| struct clvm_header *inheader = (struct clvm_header *) buf; |
| ntoh_clvm(inheader); /* Byteswap fields */ |
| |
| if (verify_message(buf, len) < 0) { |
| clops->name_from_csid(csid, nodename); |
| log_error("process_message from %s len %d bad verify.", nodename, len); |
| dump_message(buf, len); |
| return; |
| } |
| |
| if (inheader->cmd == CLVMD_CMD_REPLY) |
| process_reply(inheader, len, csid); |
| else |
| add_to_lvmqueue(client, inheader, len, csid); |
| } |
| |
| |
| static void check_all_callback(struct local_client *client, const char *csid, |
| int node_up) |
| { |
| if (!node_up) |
| add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running", 18); |
| } |
| |
| /* Check to see if all CLVMDs are running (ie one on |
| every node in the cluster). |
| If not, returns -1 and prints out a list of errant nodes */ |
| static int check_all_clvmds_running(struct local_client *client) |
| { |
| DEBUGLOG("check_all_clvmds_running\n"); |
| return clops->cluster_do_node_callback(client, check_all_callback); |
| } |
| |
| /* Return a local_client struct given a client ID. |
| client IDs are in network byte order */ |
| static struct local_client *find_client(int clientid) |
| { |
| struct local_client *thisfd; |
| |
| for (thisfd = &local_client_head; thisfd; thisfd = thisfd->next) |
| if (thisfd->fd == (int)ntohl(clientid)) |
| return thisfd; |
| |
| return NULL; |
| } |
| |
| /* Byte-swapping routines for the header so we |
| work in a heterogeneous environment */ |
| static void hton_clvm(struct clvm_header *hdr) |
| { |
| hdr->status = htonl(hdr->status); |
| hdr->arglen = htonl(hdr->arglen); |
| hdr->xid = htons(hdr->xid); |
| /* Don't swap clientid as it's only a token as far as |
| remote nodes are concerned */ |
| } |
| |
| static void ntoh_clvm(struct clvm_header *hdr) |
| { |
| hdr->status = ntohl(hdr->status); |
| hdr->arglen = ntohl(hdr->arglen); |
| hdr->xid = ntohs(hdr->xid); |
| } |
| |
| /* Handler for SIGUSR2 - sent to kill subthreads */ |
| static void sigusr2_handler(int sig) |
| { |
| DEBUGLOG("SIGUSR2 received\n"); |
| return; |
| } |
| |
| static void sigterm_handler(int sig) |
| { |
| quit = 1; |
| return; |
| } |
| |
| static void sighup_handler(int sig) |
| { |
| reread_config = 1; |
| } |
| |
| int sync_lock(const char *resource, int mode, int flags, int *lockid) |
| { |
| return clops->sync_lock(resource, mode, flags, lockid); |
| } |
| |
| int sync_unlock(const char *resource, int lockid) |
| { |
| return clops->sync_unlock(resource, lockid); |
| } |
| |
| static if_type_t parse_cluster_interface(char *ifname) |
| { |
| if_type_t iface = IF_AUTO; |
| |
| if (!strcmp(ifname, "auto")) |
| iface = IF_AUTO; |
| else if (!strcmp(ifname, "cman")) |
| iface = IF_CMAN; |
| else if (!strcmp(ifname, "openais")) |
| iface = IF_OPENAIS; |
| else if (!strcmp(ifname, "corosync")) |
| iface = IF_COROSYNC; |
| else if (!strcmp(ifname, "singlenode")) |
| iface = IF_SINGLENODE; |
| |
| return iface; |
| } |
| |
| /* |
| * Try and find a cluster system in corosync's objdb, if it is running. This is |
| * only called if the command-line option is not present, and if it fails |
| * we still try the interfaces in order. |
| */ |
| static if_type_t get_cluster_type(void) |
| { |
| #ifdef HAVE_COROSYNC_CONFDB_H |
| confdb_handle_t handle; |
| if_type_t type = IF_AUTO; |
| int result; |
| char buf[255]; |
| size_t namelen = sizeof(buf); |
| hdb_handle_t cluster_handle; |
| hdb_handle_t clvmd_handle; |
| confdb_callbacks_t callbacks = { 0 }; |
| |
| result = confdb_initialize (&handle, &callbacks); |
| if (result != CS_OK) |
| return type; |
| |
| result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE); |
| if (result != CS_OK) |
| goto out; |
| |
| result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle); |
| if (result != CS_OK) |
| goto out; |
| |
| result = confdb_object_find_start(handle, cluster_handle); |
| if (result != CS_OK) |
| goto out; |
| |
| result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle); |
| if (result != CS_OK) |
| goto out; |
| |
| result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen); |
| if (result != CS_OK) |
| goto out; |
| |
| if (namelen >= sizeof(buf)) |
| namelen = sizeof(buf) - 1; |
| |
| buf[namelen] = '\0'; |
| type = parse_cluster_interface(buf); |
| DEBUGLOG("got interface type '%s' from confdb\n", buf); |
| out: |
| confdb_finalize(handle); |
| return type; |
| #else |
| return IF_AUTO; |
| #endif |
| } |