| /* |
| * (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org> |
| * (C) 2011 by Vyatta Inc. <http://www.vyatta.com> |
| * |
| * This program is free software; you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License as published by |
| * the Free Software Foundation; either version 2 of the License, or |
| * (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with this program; if not, write to the Free Software |
| * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. |
| */ |
| |
| #include "conntrackd.h" |
| #include "sync.h" |
| #include "queue.h" |
| #include "network.h" |
| #include "alarm.h" |
| #include "log.h" |
| #include "cache.h" |
| #include "fds.h" |
| |
| #include <string.h> |
| #include <errno.h> |
| |
| #if 0 |
| #define dp printf |
| #else |
| #define dp(...) |
| #endif |
| |
| struct queue *rs_queue; |
| static uint32_t exp_seq; |
| static uint32_t window; |
| static uint32_t ack_from; |
| static int ack_from_set = 0; |
| static struct alarm_block alive_alarm; |
| |
| enum { |
| HELLO_INIT, |
| HELLO_SAY, |
| HELLO_DONE, |
| }; |
| static int hello_state = HELLO_INIT; |
| static int say_hello_back; |
| |
| /* XXX: alive message expiration configurable */ |
| #define ALIVE_INT 1 |
| |
| struct cache_ftfw { |
| struct queue_node qnode; |
| struct cache_object *obj; |
| uint32_t seq; |
| }; |
| |
| static void cache_ftfw_add(struct cache_object *obj, void *data) |
| { |
| struct cache_ftfw *cn = data; |
| cn->obj = obj; |
| /* These nodes are not inserted in the list */ |
| queue_node_init(&cn->qnode, Q_ELEM_OBJ); |
| } |
| |
| static void cache_ftfw_del(struct cache_object *obj, void *data) |
| { |
| struct cache_ftfw *cn = data; |
| queue_del(&cn->qnode); |
| } |
| |
| static struct cache_extra cache_ftfw_extra = { |
| .size = sizeof(struct cache_ftfw), |
| .add = cache_ftfw_add, |
| .destroy = cache_ftfw_del |
| }; |
| |
| static void nethdr_set_hello(struct nethdr *net) |
| { |
| switch(hello_state) { |
| case HELLO_INIT: |
| hello_state = HELLO_SAY; |
| /* fall through */ |
| case HELLO_SAY: |
| net->flags |= NET_F_HELLO; |
| break; |
| } |
| if (say_hello_back) { |
| net->flags |= NET_F_HELLO_BACK; |
| say_hello_back = 0; |
| } |
| } |
| |
| static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) |
| { |
| struct queue_object *qobj; |
| struct nethdr_ack *ack; |
| |
| qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); |
| if (qobj == NULL) |
| return; |
| |
| ack = (struct nethdr_ack *)qobj->data; |
| ack->type = NET_T_CTL; |
| ack->flags = flags; |
| ack->from = from; |
| ack->to = to; |
| |
| if (queue_add(STATE_SYNC(tx_queue), &qobj->qnode) < 0) |
| queue_object_free(qobj); |
| } |
| |
| static void tx_queue_add_ctlmsg2(uint32_t flags) |
| { |
| struct queue_object *qobj; |
| struct nethdr *ctl; |
| |
| qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); |
| if (qobj == NULL) |
| return; |
| |
| ctl = (struct nethdr *)qobj->data; |
| ctl->type = NET_T_CTL; |
| ctl->flags = flags; |
| |
| if (queue_add(STATE_SYNC(tx_queue), &qobj->qnode) < 0) |
| queue_object_free(qobj); |
| } |
| |
| /* this function is called from the alarm framework */ |
| static void do_alive_alarm(struct alarm_block *a, void *data) |
| { |
| if (ack_from_set && nethdr_track_is_seq_set()) { |
| /* exp_seq contains the last update received */ |
| tx_queue_add_ctlmsg(NET_F_ACK, |
| ack_from, |
| STATE_SYNC(last_seq_recv)); |
| ack_from_set = 0; |
| } else |
| tx_queue_add_ctlmsg2(NET_F_ALIVE); |
| |
| add_alarm(&alive_alarm, ALIVE_INT, 0); |
| } |
| |
| static int ftfw_init(void) |
| { |
| rs_queue = queue_create("rsqueue", CONFIG(resend_queue_size), 0); |
| if (rs_queue == NULL) { |
| dlog(LOG_ERR, "cannot create rs queue"); |
| return -1; |
| } |
| |
| init_alarm(&alive_alarm, NULL, do_alive_alarm); |
| add_alarm(&alive_alarm, ALIVE_INT, 0); |
| |
| /* set ack window size */ |
| window = CONFIG(window_size); |
| |
| return 0; |
| } |
| |
| static void ftfw_kill(void) |
| { |
| queue_destroy(rs_queue); |
| } |
| |
| static int do_cache_to_tx(void *data1, void *data2) |
| { |
| struct cache_object *obj = data2; |
| struct cache_ftfw *cn = cache_get_extra(obj); |
| |
| if (queue_in(rs_queue, &cn->qnode)) { |
| queue_del(&cn->qnode); |
| queue_add(STATE_SYNC(tx_queue), &cn->qnode); |
| } else { |
| if (queue_add(STATE_SYNC(tx_queue), &cn->qnode) > 0) |
| cache_object_get(obj); |
| } |
| return 0; |
| } |
| |
| static int rs_queue_dump(struct queue_node *n, const void *data2) |
| { |
| const int *fd = data2; |
| char buf[512]; |
| int size; |
| |
| switch(n->type) { |
| case Q_ELEM_CTL: { |
| struct nethdr *net = queue_node_data(n); |
| size = sprintf(buf, "control -> seq:%u flags:%u\n", |
| net->seq, net->flags); |
| break; |
| } |
| case Q_ELEM_OBJ: { |
| struct cache_ftfw *cn = (struct cache_ftfw *) n; |
| size = sprintf(buf, "object -> seq:%u\n", cn->seq); |
| break; |
| } |
| default: |
| return 0; |
| } |
| send(*fd, buf, size, 0); |
| return 0; |
| } |
| |
| static void ftfw_local_queue(int fd) |
| { |
| char buf[512]; |
| int size; |
| |
| size = sprintf(buf, "resent queue (len=%u)\n", queue_len(rs_queue)); |
| send(fd, buf, size, 0); |
| queue_iterate(rs_queue, &fd, rs_queue_dump); |
| } |
| |
| static int ftfw_local(int fd, int type, void *data) |
| { |
| int ret = LOCAL_RET_OK; |
| |
| switch(type) { |
| case REQUEST_DUMP: |
| dlog(LOG_NOTICE, "request resync"); |
| tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0); |
| break; |
| case SEND_BULK: |
| dlog(LOG_NOTICE, "sending bulk update"); |
| cache_iterate(STATE(mode)->internal->ct.data, |
| NULL, do_cache_to_tx); |
| cache_iterate(STATE(mode)->internal->exp.data, |
| NULL, do_cache_to_tx); |
| break; |
| case STATS_RSQUEUE: |
| ftfw_local_queue(fd); |
| break; |
| } |
| |
| return ret; |
| } |
| |
| static int rs_queue_to_tx(struct queue_node *n, const void *data) |
| { |
| const struct nethdr_ack *nack = data; |
| |
| switch(n->type) { |
| case Q_ELEM_CTL: { |
| struct nethdr_ack *net = queue_node_data(n); |
| |
| if (before(net->seq, nack->from)) |
| return 0; /* continue */ |
| else if (after(net->seq, nack->to)) |
| return 1; /* break */ |
| |
| dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", |
| net->seq, net->flags, net->len); |
| |
| queue_del(n); |
| queue_add(STATE_SYNC(tx_queue), n); |
| break; |
| } |
| case Q_ELEM_OBJ: { |
| struct cache_ftfw *cn; |
| |
| cn = (struct cache_ftfw *) n; |
| if (before(cn->seq, nack->from)) |
| return 0; |
| else if (after(cn->seq, nack->to)) |
| return 1; |
| |
| dp("resending nack'ed (oldseq=%u)\n", cn->seq); |
| |
| queue_del(n); |
| queue_add(STATE_SYNC(tx_queue), n); |
| break; |
| } |
| } |
| return 0; |
| } |
| |
| static int rs_queue_empty(struct queue_node *n, const void *data) |
| { |
| const struct nethdr_ack *h = data; |
| |
| switch(n->type) { |
| case Q_ELEM_CTL: { |
| struct nethdr_ack *net = queue_node_data(n); |
| |
| if (h == NULL) { |
| queue_del(n); |
| queue_object_free((struct queue_object *)n); |
| return 0; |
| } |
| if (before(net->seq, h->from)) |
| return 0; /* continue */ |
| else if (after(net->seq, h->to)) |
| return 1; /* break */ |
| |
| dp("remove from queue (seq=%u)\n", net->seq); |
| queue_del(n); |
| queue_object_free((struct queue_object *)n); |
| break; |
| } |
| case Q_ELEM_OBJ: { |
| struct cache_ftfw *cn; |
| |
| cn = (struct cache_ftfw *) n; |
| if (h == NULL) { |
| queue_del(n); |
| cache_object_put(cn->obj); |
| return 0; |
| } |
| if (before(cn->seq, h->from)) |
| return 0; |
| else if (after(cn->seq, h->to)) |
| return 1; |
| |
| dp("queue: deleting from queue (seq=%u)\n", cn->seq); |
| queue_del(n); |
| cache_object_put(cn->obj); |
| break; |
| } |
| } |
| return 0; |
| } |
| |
| static int digest_msg(const struct nethdr *net) |
| { |
| if (IS_DATA(net)) |
| return MSG_DATA; |
| |
| else if (IS_ACK(net)) { |
| const struct nethdr_ack *h = (const struct nethdr_ack *) net; |
| |
| if (before(h->to, h->from)) |
| return MSG_BAD; |
| |
| queue_iterate(rs_queue, h, rs_queue_empty); |
| return MSG_CTL; |
| |
| } else if (IS_NACK(net)) { |
| const struct nethdr_ack *nack = (const struct nethdr_ack *) net; |
| |
| if (before(nack->to, nack->from)) |
| return MSG_BAD; |
| |
| queue_iterate(rs_queue, nack, rs_queue_to_tx); |
| return MSG_CTL; |
| |
| } else if (IS_RESYNC(net)) { |
| dp("RESYNC ALL\n"); |
| cache_iterate(STATE(mode)->internal->ct.data, NULL, |
| do_cache_to_tx); |
| cache_iterate(STATE(mode)->internal->exp.data, NULL, |
| do_cache_to_tx); |
| return MSG_CTL; |
| |
| } else if (IS_ALIVE(net)) |
| return MSG_CTL; |
| |
| return MSG_BAD; |
| } |
| |
| static int digest_hello(const struct nethdr *net) |
| { |
| int ret = 0; |
| |
| if (IS_HELLO(net)) { |
| say_hello_back = 1; |
| ret = 1; |
| } |
| if (IS_HELLO_BACK(net)) { |
| /* this is a hello back for a requested hello */ |
| if (hello_state == HELLO_SAY) |
| hello_state = HELLO_DONE; |
| } |
| |
| return ret; |
| } |
| |
| static int ftfw_recv(const struct nethdr *net) |
| { |
| int ret = MSG_DATA; |
| |
| if (digest_hello(net)) { |
| /* we have received a hello while we had data to acknowledge. |
| * reset the window, the other doesn't know anthing about it. */ |
| if (ack_from_set && before(net->seq, ack_from)) { |
| window = CONFIG(window_size) - 1; |
| ack_from = net->seq; |
| } |
| |
| /* XXX: flush the resend queues since the other does not |
| * know anything about that data, we are unreliable until |
| * the helloing finishes */ |
| queue_iterate(rs_queue, NULL, rs_queue_empty); |
| |
| goto bypass; |
| } |
| |
| switch (nethdr_track_seq(net->seq, &exp_seq)) { |
| case SEQ_AFTER: |
| ret = digest_msg(net); |
| if (ret == MSG_BAD) { |
| ret = MSG_BAD; |
| goto out; |
| } |
| |
| if (ack_from_set) { |
| tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1); |
| ack_from_set = 0; |
| } |
| |
| tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); |
| |
| /* count this message as part of the new window */ |
| window = CONFIG(window_size) - 1; |
| ack_from = net->seq; |
| ack_from_set = 1; |
| break; |
| |
| case SEQ_BEFORE: |
| /* we don't accept delayed packets */ |
| ret = MSG_DROP; |
| break; |
| |
| case SEQ_UNSET: |
| case SEQ_IN_SYNC: |
| bypass: |
| ret = digest_msg(net); |
| if (ret == MSG_BAD) { |
| ret = MSG_BAD; |
| goto out; |
| } |
| |
| if (!ack_from_set) { |
| ack_from_set = 1; |
| ack_from = net->seq; |
| } |
| |
| if (--window <= 0) { |
| /* received a window, send an acknowledgement */ |
| tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq); |
| window = CONFIG(window_size); |
| ack_from_set = 0; |
| } |
| } |
| |
| out: |
| if ((ret == MSG_DATA || ret == MSG_CTL)) |
| nethdr_track_update_seq(net->seq); |
| |
| return ret; |
| } |
| |
| static void rs_queue_purge_full(void) |
| { |
| struct queue_node *n; |
| |
| n = queue_del_head(rs_queue); |
| switch(n->type) { |
| case Q_ELEM_CTL: { |
| struct queue_object *qobj = (struct queue_object *)n; |
| queue_object_free(qobj); |
| break; |
| } |
| case Q_ELEM_OBJ: { |
| struct cache_ftfw *cn; |
| |
| cn = (struct cache_ftfw *)n; |
| cache_object_put(cn->obj); |
| break; |
| } |
| } |
| } |
| |
| static int tx_queue_xmit(struct queue_node *n, const void *data) |
| { |
| queue_del(n); |
| |
| switch(n->type) { |
| case Q_ELEM_CTL: { |
| struct nethdr *net = queue_node_data(n); |
| |
| nethdr_set_hello(net); |
| |
| if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { |
| nethdr_set_ack(net); |
| } else { |
| nethdr_set_ctl(net); |
| } |
| HDR_HOST2NETWORK(net); |
| |
| dp("tx_queue sq: %u fl:%u len:%u\n", |
| ntohl(net->seq), net->flags, ntohs(net->len)); |
| |
| multichannel_send(STATE_SYNC(channel), net); |
| HDR_NETWORK2HOST(net); |
| |
| if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { |
| if (queue_add(rs_queue, n) < 0) { |
| if (errno == ENOSPC) { |
| rs_queue_purge_full(); |
| queue_add(rs_queue, n); |
| } |
| } |
| } else |
| queue_object_free((struct queue_object *)n); |
| break; |
| } |
| case Q_ELEM_OBJ: { |
| struct cache_ftfw *cn; |
| int type; |
| struct nethdr *net; |
| |
| cn = (struct cache_ftfw *)n; |
| type = object_status_to_network_type(cn->obj); |
| net = cn->obj->cache->ops->build_msg(cn->obj, type); |
| nethdr_set_hello(net); |
| |
| dp("tx_list sq: %u fl:%u len:%u\n", |
| ntohl(net->seq), net->flags, ntohs(net->len)); |
| |
| multichannel_send(STATE_SYNC(channel), net); |
| cn->seq = ntohl(net->seq); |
| if (queue_add(rs_queue, &cn->qnode) < 0) { |
| if (errno == ENOSPC) { |
| rs_queue_purge_full(); |
| queue_add(rs_queue, &cn->qnode); |
| } |
| } |
| /* we release the object once we get the acknowlegment */ |
| break; |
| } |
| } |
| |
| return 0; |
| } |
| |
| static void ftfw_xmit(void) |
| { |
| queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit); |
| add_alarm(&alive_alarm, ALIVE_INT, 0); |
| dp("tx_queue_len:%u rs_queue_len:%u\n", |
| queue_len(tx_queue), queue_len(rs_queue)); |
| } |
| |
| static void ftfw_enqueue(struct cache_object *obj, int type) |
| { |
| struct cache_ftfw *cn = cache_get_extra(obj); |
| if (queue_in(rs_queue, &cn->qnode)) { |
| queue_del(&cn->qnode); |
| queue_add(STATE_SYNC(tx_queue), &cn->qnode); |
| } else { |
| if (queue_add(STATE_SYNC(tx_queue), &cn->qnode) > 0) |
| cache_object_get(obj); |
| } |
| } |
| |
| struct sync_mode sync_ftfw = { |
| .internal_cache_flags = NO_FEATURES, |
| .external_cache_flags = NO_FEATURES, |
| .internal_cache_extra = &cache_ftfw_extra, |
| .init = ftfw_init, |
| .kill = ftfw_kill, |
| .local = ftfw_local, |
| .recv = ftfw_recv, |
| .enqueue = ftfw_enqueue, |
| .xmit = ftfw_xmit, |
| }; |