From 5834fc2036259c9f9924156638502bbf9aa15f06 Mon Sep 17 00:00:00 2001 From: Alex Xu Date: Sun, 3 Jul 2016 21:43:13 -0400 Subject: Use non-blocking IO, loop recvfrom. --- src/client.c | 356 ++++++++++++++++++++++++++++++----------------------------- src/server.c | 312 +++++++++++++++++++++++++++------------------------ 2 files changed, 353 insertions(+), 315 deletions(-) (limited to 'src') diff --git a/src/client.c b/src/client.c index c29d733..21f6217 100644 --- a/src/client.c +++ b/src/client.c @@ -1,4 +1,6 @@ +#include #include +#include #include #include #include @@ -192,95 +194,95 @@ static void cc_cb(struct ev_loop *loop, ev_io *w, int revents __attribute__((unu socklen_t pkt_addrlen = sizeof(struct sockaddr_in6); ssize_t should_ssz, rsz, ssz; - if ((rsz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&rsock->c_data->pkt_addr, &pkt_addrlen)) == -1) { - perror("recvfrom"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + while ((rsz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&rsock->c_data->pkt_addr, &pkt_addrlen)) != -1) { + DBG("received %zd raw bytes on client", rsz); - DBG("received %zd raw bytes on client", rsz); + if (pkt_addrlen > sizeof(struct sockaddr_in6)) + abort(); - if (pkt_addrlen > sizeof(struct sockaddr_in6)) - abort(); - - if ((size_t)rsz < sizeof(struct tcphdr)) - return; + if ((size_t)rsz < sizeof(struct tcphdr)) + return; - struct tcphdr *rhdr = (struct tcphdr *)rbuf; + struct tcphdr *rhdr = (struct tcphdr *)rbuf; - struct o_c_sock *sock; + struct o_c_sock *sock; - HASH_FIND(hh_lp, rsock->o_socks_by_lport, &rhdr->th_dport, sizeof(in_port_t), sock); + HASH_FIND(hh_lp, rsock->o_socks_by_lport, &rhdr->th_dport, sizeof(in_port_t), sock); - if (!sock) { - DBG("could not find conn with lport %hu", ntohs(rhdr->th_dport)); - return; - } + if (!sock) { + DBG("could not find conn with lport %hu", ntohs(rhdr->th_dport)); + return; + } - if (sock->status == TCP_SYN_SENT && rhdr->th_flags == (TH_SYN | TH_ACK)) { - DBG("SYN/ACK received, connection established"); + if (sock->status == TCP_SYN_SENT && rhdr->th_flags == (TH_SYN | TH_ACK)) { + DBG("SYN/ACK received, connection established"); - sock->status = TCP_ESTABLISHED; + sock->status = TCP_ESTABLISHED; - struct tcphdr shdr = { - .th_sport = sock->l_port, - .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, - .th_seq = htonl(sock->seq_num), - .th_ack = rhdr->th_seq, - .th_win = 65535, - .th_flags = TH_ACK, - .th_off = 5 - }; + struct tcphdr shdr = { + .th_sport = sock->l_port, + .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, + .th_seq = htonl(sock->seq_num), + .th_ack = rhdr->th_seq, + .th_win = 65535, + .th_flags = TH_ACK, + .th_off = 5 + }; - sock->seq_num += sock->pending_data_size; + sock->seq_num += sock->pending_data_size; - struct iovec iovs[2] = { - { .iov_base = &shdr, .iov_len = sizeof(shdr) }, - { .iov_base = sock->pending_data, .iov_len = sock->pending_data_size } - }; + struct iovec iovs[2] = { + { .iov_base = &shdr, .iov_len = sizeof(shdr) }, + { .iov_base = sock->pending_data, .iov_len = sock->pending_data_size } + }; - struct msghdr msghdr = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = iovs, - .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) - }; + struct msghdr msghdr = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = iovs, + .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) + }; - should_ssz = sizeof(shdr) + sock->pending_data_size; - ssz = sendmsg(rsock->fd, &msghdr, 0); + should_ssz = sizeof(shdr) + sock->pending_data_size; + ssz = sendmsg(rsock->fd, &msghdr, 0); - if (ssz < 0) { - perror("sendmsg"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } else if ((size_t)ssz != should_ssz) { - fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz); - } + if (ssz < 0) { + perror("sendmsg"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if ((size_t)ssz != should_ssz) { + fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz); + } - free(sock->pending_data); + free(sock->pending_data); - ev_timer_stop(EV_A_ &sock->tm_w); - // this delay is not very important because one, it is OK if UDP - // packets are lost, and two, they are only delayed until a new - // connection is established. however, it is probably a good idea to - // set this higher than the UDP ping delay if you are using one. - ev_timer_init(&sock->tm_w, c_tm_cb, 10. * 60., 10. * 60.); - ev_timer_start(EV_A_ &sock->tm_w); - } + ev_timer_stop(EV_A_ &sock->tm_w); + // this delay is not very important because one, it is OK if UDP + // packets are lost, and two, they are only delayed until a new + // connection is established. however, it is probably a good idea to + // set this higher than the UDP ping delay if you are using one. + ev_timer_init(&sock->tm_w, c_tm_cb, 10. * 60., 10. * 60.); + ev_timer_start(EV_A_ &sock->tm_w); + } - should_ssz = rsz - rhdr->th_off * 32 / CHAR_BIT; - if (should_ssz > 0) { - DBG("sending %zd bytes to client", should_ssz); - ssz = sendto(rsock->c_data->s_sock, rbuf + rhdr->th_off * 32 / CHAR_BIT, should_ssz, 0, sock->c_address, rsock->c_data->s_addrlen); + should_ssz = rsz - rhdr->th_off * 32 / CHAR_BIT; + if (should_ssz > 0) { + DBG("sending %zd bytes to client", should_ssz); + ssz = sendto(rsock->c_data->s_sock, rbuf + rhdr->th_off * 32 / CHAR_BIT, should_ssz, 0, sock->c_address, rsock->c_data->s_addrlen); - if (ssz < 0) { - perror("sendto"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } else if ((size_t)ssz != should_ssz) { - fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz); + if (ssz < 0) { + perror("sendto"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if ((size_t)ssz != should_ssz) { + fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz); + } } } + if (errno != EAGAIN) { + perror("recvfrom"); + ev_break(EV_A_ EVBREAK_ONE); + } } #define SIX_OR_FOUR(sa, six, four, neither) \ @@ -311,6 +313,11 @@ static int c_rsock_init(struct o_c_sock *sock, struct addrinfo *res) { return 0; } + if (fcntl(sock->rsock->fd, F_SETFL, O_NONBLOCK) == -1) { + perror("fcntl"); + return 0; + } + struct sockaddr_storage our_addr; socklen_t our_addr_len = sizeof(our_addr); int r = getsockname(sock->rsock->fd, (struct sockaddr *)&our_addr, &our_addr_len); @@ -331,136 +338,136 @@ static void cs_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) { ssize_t sz; char rbuf[65536]; - if ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&c_data->pkt_addr, &addresslen)) == -1) { - perror("recvfrom"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + while ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&c_data->pkt_addr, &addresslen)) != -1) { + DBG("received %zd bytes on server", sz); - DBG("received %zd bytes on server", sz); + if (addresslen != c_data->s_addrlen) + abort(); - if (addresslen != c_data->s_addrlen) - abort(); + struct o_c_sock *sock; + HASH_FIND(hh_ca, c_data->o_socks_by_caddr, &c_data->pkt_addr, addresslen, sock); - struct o_c_sock *sock; - HASH_FIND(hh_ca, c_data->o_socks_by_caddr, &c_data->pkt_addr, addresslen, sock); - - if (!sock) { - DBG("could not locate matching socket for client, initializing new connection"); - sock = calloc(1, sizeof(*sock)); - - struct addrinfo *res; - DBG("looking up %s:%s", c_data->r_host, c_data->r_port); - // TODO: make this asynchronous - int r = getaddrinfo(c_data->r_host, c_data->r_port, NULL, &res); - if (r) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r)); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + if (!sock) { + DBG("could not locate matching socket for client, initializing new connection"); + sock = calloc(1, sizeof(*sock)); + + struct addrinfo *res; + DBG("looking up %s:%s", c_data->r_host, c_data->r_port); + // TODO: make this asynchronous + int r = getaddrinfo(c_data->r_host, c_data->r_port, NULL, &res); + if (r) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r)); + ev_break(EV_A_ EVBREAK_ONE); + return; + } + + sock->c_address = malloc(addresslen); + memcpy(sock->c_address, &c_data->pkt_addr, addresslen); - sock->c_address = malloc(addresslen); - memcpy(sock->c_address, &c_data->pkt_addr, addresslen); + HASH_FIND(hh, c_data->o_rsocks, res->ai_addr, res->ai_addrlen, sock->rsock); - HASH_FIND(hh, c_data->o_rsocks, res->ai_addr, res->ai_addrlen, sock->rsock); + if (!sock->rsock) { + DBG("could not locate remote socket to host, initializing new raw socket"); + if (!c_rsock_init(sock, res)) { + ev_break(EV_A_ EVBREAK_ONE); + return; + } + sock->rsock->c_data = c_data; + + ev_io_init(&sock->rsock->io_w, cc_cb, sock->rsock->fd, EV_READ); + sock->rsock->io_w.data = sock->rsock; + ev_io_start(EV_A_ &sock->rsock->io_w); + + HASH_ADD_KEYPTR(hh, c_data->o_rsocks, sock->rsock->r_addr, sock->rsock->r_addrlen, sock->rsock); + } - if (!sock->rsock) { - DBG("could not locate remote socket to host, initializing new raw socket"); - if (!c_rsock_init(sock, res)) { + uint16_t l_port = reserve_port(sock->rsock->used_ports); + DBG("using port %hu", l_port); + if (!l_port) { + fputs("we ran out of ports?\n", stderr); ev_break(EV_A_ EVBREAK_ONE); return; } - sock->rsock->c_data = c_data; + sock->l_port = htons(l_port); - ev_io_init(&sock->rsock->io_w, cc_cb, sock->rsock->fd, EV_READ); - sock->rsock->io_w.data = sock->rsock; - ev_io_start(EV_A_ &sock->rsock->io_w); + HASH_ADD_KEYPTR(hh_ca, c_data->o_socks_by_caddr, sock->c_address, addresslen, sock); + HASH_ADD(hh_lp, sock->rsock->o_socks_by_lport, l_port, sizeof(in_port_t), sock); - HASH_ADD_KEYPTR(hh, c_data->o_rsocks, sock->rsock->r_addr, sock->rsock->r_addrlen, sock->rsock); - } + sock->seq_num = random(); - uint16_t l_port = reserve_port(sock->rsock->used_ports); - DBG("using port %hu", l_port); - if (!l_port) { - fputs("we ran out of ports?\n", stderr); - ev_break(EV_A_ EVBREAK_ONE); - return; - } - sock->l_port = htons(l_port); + struct tcphdr buf = { + .th_sport = sock->l_port, + .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, + .th_seq = htonl(sock->seq_num++), + .th_flags = TH_SYN, + .th_off = 5 + }; + + sock->pending_data = malloc(sz); + memcpy(sock->pending_data, rbuf, sz); + sock->pending_data_size = sz; + + DBG("sending SYN to remote"); + sz = send(sock->rsock->fd, &buf, sizeof(buf), 0); + if (sz < 0) { + perror("send"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if ((size_t)sz != sizeof(buf)) { + fprintf(stderr, "send %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz); + } - HASH_ADD_KEYPTR(hh_ca, c_data->o_socks_by_caddr, sock->c_address, addresslen, sock); - HASH_ADD(hh_lp, sock->rsock->o_socks_by_lport, l_port, sizeof(in_port_t), sock); + // resend SYN - sock->seq_num = random(); + ev_timer_init(&sock->tm_w, c_syn_tm_cb, 0., tcp_syn_retry_timeouts[0]); + sock->tm_w.data = sock; + sock->syn_retries = 0; + c_adv_syn_tm(EV_A_ sock); - struct tcphdr buf = { + sock->status = TCP_SYN_SENT; + + return; + } + + struct tcphdr tcp_hdr = { .th_sport = sock->l_port, .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, - .th_seq = htonl(sock->seq_num++), - .th_flags = TH_SYN, - .th_off = 5 + .th_seq = htonl(sock->seq_num), + .th_off = 5, + .th_win = 65535, + .th_flags = TH_PUSH + }; + + sock->seq_num += sz; + + struct iovec iovs[2] = { + { .iov_base = &tcp_hdr, .iov_len = sizeof(tcp_hdr) }, + { .iov_base = rbuf, .iov_len = sz } }; - sock->pending_data = malloc(sz); - memcpy(sock->pending_data, rbuf, sz); - sock->pending_data_size = sz; + struct msghdr msghdr = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = iovs, + .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) + }; - DBG("sending SYN to remote"); - sz = send(sock->rsock->fd, &buf, sizeof(buf), 0); + size_t should_send_size = sizeof(tcp_hdr) + sz; + DBG("sending %zd raw bytes containing %zd bytes payload to remote", should_send_size, sz); + sz = sendmsg(sock->rsock->fd, &msghdr, 0); if (sz < 0) { - perror("send"); + perror("sendmsg"); ev_break(EV_A_ EVBREAK_ONE); return; - } else if ((size_t)sz != sizeof(buf)) { - fprintf(stderr, "send %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz); + } else if ((size_t)sz != should_send_size) { + fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz); } - - // resend SYN - - ev_timer_init(&sock->tm_w, c_syn_tm_cb, 0., tcp_syn_retry_timeouts[0]); - sock->tm_w.data = sock; - sock->syn_retries = 0; - c_adv_syn_tm(EV_A_ sock); - - sock->status = TCP_SYN_SENT; - - return; + ev_timer_again(EV_A_ &sock->tm_w); } - - struct tcphdr tcp_hdr = { - .th_sport = sock->l_port, - .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, - .th_seq = htonl(sock->seq_num), - .th_off = 5, - .th_win = 65535, - .th_flags = TH_PUSH - }; - - sock->seq_num += sz; - - struct iovec iovs[2] = { - { .iov_base = &tcp_hdr, .iov_len = sizeof(tcp_hdr) }, - { .iov_base = rbuf, .iov_len = sz } - }; - - struct msghdr msghdr = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = iovs, - .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) - }; - - size_t should_send_size = sizeof(tcp_hdr) + sz; - DBG("sending %zd raw bytes containing %zd bytes payload to remote", should_send_size, sz); - sz = sendmsg(sock->rsock->fd, &msghdr, 0); - if (sz < 0) { - perror("sendmsg"); + if (errno != EAGAIN) { + perror("recvfrom"); ev_break(EV_A_ EVBREAK_ONE); - return; - } else if ((size_t)sz != should_send_size) { - fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz); } - ev_timer_again(EV_A_ &sock->tm_w); } static void c_cleanup() { @@ -509,6 +516,11 @@ int start_client(const char *s_host, const char *s_port, const char *r_host, con freeaddrinfo(res); + if (fcntl(c_data.s_sock, F_SETFL, O_NONBLOCK) == -1) { + perror("fcntl"); + return 4; + } + global_c_data = &c_data; atexit(c_cleanup); diff --git a/src/server.c b/src/server.c index 158c0af..462212f 100644 --- a/src/server.c +++ b/src/server.c @@ -1,5 +1,7 @@ #include +#include #include +#include #include #include #include @@ -84,51 +86,58 @@ static void sc_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) { DBG("-- entering sc_cb --"); - if ((sz = recv(w->fd, rbuf, sizeof(rbuf), 0)) < 0) { - perror("recv"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + while ((sz = recv(w->fd, rbuf, sizeof(rbuf), 0)) > 0) { + DBG("received %zd bytes matching socket %p", sz, sock); - DBG("received %zd bytes matching socket %p", sz, sock); + struct tcphdr hdr; + s_prep_c_addr(sock, &hdr); + hdr.th_off = 5; - struct tcphdr hdr; - s_prep_c_addr(sock, &hdr); - hdr.th_off = 5; + struct iovec iovs[2] = { + { .iov_base = &hdr, .iov_len = sizeof(hdr) }, + { .iov_base = rbuf, .iov_len = sz } + }; - struct iovec iovs[2] = { - { .iov_base = &hdr, .iov_len = sizeof(hdr) }, - { .iov_base = rbuf, .iov_len = sz } - }; + in_port_t c_port = ((struct sockaddr_in *)&sock->c_addr)->sin_port; + ((struct sockaddr_in *)&sock->c_addr)->sin_port = 0; - in_port_t c_port = ((struct sockaddr_in *)&sock->c_addr)->sin_port; - ((struct sockaddr_in *)&sock->c_addr)->sin_port = 0; + struct msghdr msghdr = { + .msg_name = &sock->c_addr, + .msg_namelen = sock->s_data->s_addrlen, + .msg_iov = iovs, + .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) + }; - struct msghdr msghdr = { - .msg_name = &sock->c_addr, - .msg_namelen = sock->s_data->s_addrlen, - .msg_iov = iovs, - .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) - }; + size_t should_send_size = sizeof(hdr) + sz; - size_t should_send_size = sizeof(hdr) + sz; + assert(sock->status == TCP_ESTABLISHED); - assert(sock->status == TCP_ESTABLISHED); + DBG("sending %zd bytes to client", should_send_size); + sz = sendmsg(sock->s_data->s_sock, &msghdr, 0); - DBG("sending %zd bytes to client", should_send_size); - sz = sendmsg(sock->s_data->s_sock, &msghdr, 0); + ((struct sockaddr_in *)&sock->c_addr)->sin_port = c_port; - ((struct sockaddr_in *)&sock->c_addr)->sin_port = c_port; + if (sz < 0) { + perror("sendmsg"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if ((size_t)sz != should_send_size) { + fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz); + } + + ev_timer_again(EV_A_ &sock->tm_w); - if (sz < 0) { - perror("sendmsg"); - ev_break(EV_A_ EVBREAK_ONE); return; - } else if ((size_t)sz != should_send_size) { - fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz); } - ev_timer_again(EV_A_ &sock->tm_w); + if (sz == 0) + abort(); + + if (errno != EAGAIN) { + perror("recv"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } } static void ss_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) { @@ -140,158 +149,169 @@ static void ss_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) { DBG("-- entering ss_cb --"); - if ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&s_data->pkt_addr, &c_addrlen)) < 0) { - perror("recvfrom"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } - - if (c_addrlen != s_data->s_addrlen) - abort(); + while ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&s_data->pkt_addr, &c_addrlen)) > 0) { + if (c_addrlen != s_data->s_addrlen) + abort(); #ifdef DEBUG - char hbuf[NI_MAXHOST]; - r = getnameinfo((struct sockaddr *)&s_data->pkt_addr, c_addrlen, hbuf, sizeof(hbuf), NULL, 0, NI_NUMERICHOST); - if (r) { - fprintf(stderr, "getnameinfo: %s\n", gai_strerror(r)); - ev_break(EV_A_ EVBREAK_ONE); - return; - } - DBG("received %zd bytes from %s", sz, hbuf); + char hbuf[NI_MAXHOST]; + r = getnameinfo((struct sockaddr *)&s_data->pkt_addr, c_addrlen, hbuf, sizeof(hbuf), NULL, 0, NI_NUMERICHOST); + if (r) { + fprintf(stderr, "getnameinfo: %s\n", gai_strerror(r)); + ev_break(EV_A_ EVBREAK_ONE); + return; + } + DBG("received %zd bytes from %s", sz, hbuf); #endif - if ((size_t)sz < sizeof(struct tcphdr)) { - DBG("packet is smaller than TCP header, ignoring"); - return; - } + if ((size_t)sz < sizeof(struct tcphdr)) { + DBG("packet is smaller than TCP header, ignoring"); + return; + } - struct tcphdr *tcphdr = (struct tcphdr *)rbuf; + struct tcphdr *tcphdr = (struct tcphdr *)rbuf; - DBG("packet received on port %hu", ntohs(tcphdr->th_dport)); + DBG("packet received on port %hu", ntohs(tcphdr->th_dport)); - if (tcphdr->th_dport != ((struct sockaddr_in *)s_data->s_addr)->sin_port) { - DBG("packet should be on port %hu, ignoring", ntohs(((struct sockaddr_in *)s_data->s_addr)->sin_port)); - return; - } + if (tcphdr->th_dport != ((struct sockaddr_in *)s_data->s_addr)->sin_port) { + DBG("packet should be on port %hu, ignoring", ntohs(((struct sockaddr_in *)s_data->s_addr)->sin_port)); + return; + } - struct o_s_sock *sock; + struct o_s_sock *sock; - const uint8_t th_flags = tcphdr->th_flags; + const uint8_t th_flags = tcphdr->th_flags; - ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = tcphdr->th_sport; + ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = tcphdr->th_sport; - HASH_FIND(hh, s_data->o_socks_by_caddr, &s_data->pkt_addr, c_addrlen, sock); + HASH_FIND(hh, s_data->o_socks_by_caddr, &s_data->pkt_addr, c_addrlen, sock); - if (!sock) { - DBG("could not locate matching socket for client addr"); + if (!sock) { + DBG("could not locate matching socket for client addr"); - if (th_flags == TH_SYN) { - sock = malloc(sizeof(*sock)); + if (th_flags == TH_SYN) { + sock = malloc(sizeof(*sock)); - DBG("packet was SYN, initializing new connection @ %p", sock); + DBG("packet was SYN, initializing new connection @ %p", sock); - memcpy(&sock->c_addr, &s_data->pkt_addr, c_addrlen); + memcpy(&sock->c_addr, &s_data->pkt_addr, c_addrlen); - sock->s_data = s_data; - sock->seq_num = random(); - sock->c_sock = -1; - sock->status = TCP_SYN_RECV; + sock->s_data = s_data; + sock->seq_num = random(); + sock->c_sock = -1; + sock->status = TCP_SYN_RECV; - struct tcphdr buf = { - .th_sport = tcphdr->th_dport, - .th_dport = tcphdr->th_sport, - .th_seq = htonl(sock->seq_num), - .th_ack = tcphdr->th_seq, - .th_flags = TH_SYN | TH_ACK, - .th_off = 5 - }; + struct tcphdr buf = { + .th_sport = tcphdr->th_dport, + .th_dport = tcphdr->th_sport, + .th_seq = htonl(sock->seq_num), + .th_ack = tcphdr->th_seq, + .th_flags = TH_SYN | TH_ACK, + .th_off = 5 + }; - HASH_ADD(hh, s_data->o_socks_by_caddr, c_addr, c_addrlen, sock); + HASH_ADD(hh, s_data->o_socks_by_caddr, c_addr, c_addrlen, sock); - ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = htons(0); + ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = htons(0); - DBG("sending SYN/ACK"); - if ((sz = sendto(w->fd, &buf, sizeof(buf), 0, (struct sockaddr *)&s_data->pkt_addr, s_data->s_addrlen)) == -1) { - perror("sendto"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } else if (sz != sizeof(buf)) { - fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz); + DBG("sending SYN/ACK"); + if ((sz = sendto(w->fd, &buf, sizeof(buf), 0, (struct sockaddr *)&s_data->pkt_addr, s_data->s_addrlen)) == -1) { + perror("sendto"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if (sz != sizeof(buf)) { + fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz); + } + + ev_init(&sock->tm_w, s_tm_cb); + sock->tm_w.repeat = 10. * 60.; + sock->tm_w.data = sock; + ev_timer_again(EV_A_ &sock->tm_w); + } else { + DBG("packet was not SYN, ignoring"); } - ev_init(&sock->tm_w, s_tm_cb); - sock->tm_w.repeat = 10. * 60.; - sock->tm_w.data = sock; - ev_timer_again(EV_A_ &sock->tm_w); - } else { - DBG("packet was not SYN, ignoring"); + return; } - return; - } + if (tcphdr->th_off != 5) { + DBG("TCP options were specified, dropping packet"); + return; + } - if (tcphdr->th_off != 5) { - DBG("TCP options were specified, dropping packet"); - return; - } + if (th_flags == TH_RST) { + DBG("RST received, cleaning up socket"); + sock->status = TCP_CLOSE; + s_sock_cleanup(EV_A_ sock); + } - if (th_flags == TH_RST) { - DBG("RST received, cleaning up socket"); - sock->status = TCP_CLOSE; - s_sock_cleanup(EV_A_ sock); - } + if (th_flags & ~(TH_PUSH | TH_ACK)) { + DBG("TCP flags not PSH and/or ACK, dropping packet"); + return; + } - if (th_flags & ~(TH_PUSH | TH_ACK)) { - DBG("TCP flags not PSH and/or ACK, dropping packet"); - return; - } + if (sock->status == TCP_SYN_RECV) { + assert(sock->c_sock == -1); - if (sock->status == TCP_SYN_RECV) { - assert(sock->c_sock == -1); + DBG("no UDP socket for this connection, shifting to ESTABLISHED"); - DBG("no UDP socket for this connection, shifting to ESTABLISHED"); + sock->status = TCP_ESTABLISHED; - sock->status = TCP_ESTABLISHED; + struct addrinfo *res; + r = getaddrinfo(s_data->r_host, s_data->r_port, NULL, &res); + if (r) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r)); + ev_break(EV_A_ EVBREAK_ONE); + return; + } - struct addrinfo *res; - r = getaddrinfo(s_data->r_host, s_data->r_port, NULL, &res); - if (r) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r)); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + if ((sock->c_sock = socket(s_data->s_addr->sa_family, SOCK_DGRAM, 0)) == -1) { + perror("socket"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } - if ((sock->c_sock = socket(s_data->s_addr->sa_family, SOCK_DGRAM, 0)) == -1) { - perror("socket"); - ev_break(EV_A_ EVBREAK_ONE); - return; + if (connect(sock->c_sock, res->ai_addr, res->ai_addrlen)) { + perror("connect"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } + + if (fcntl(sock->c_sock, F_SETFL, O_NONBLOCK) == -1) { + perror("fcntl"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } + + freeaddrinfo(res); + + ev_timer_stop(EV_A_ &sock->tm_w); + sock->tm_w.repeat = 60. * 60. * 3.; + ev_timer_start(EV_A_ &sock->tm_w); + + ev_io_init(&sock->io_w, sc_cb, sock->c_sock, EV_READ); + sock->io_w.data = sock; + ev_io_start(EV_A_ &sock->io_w); } - if (connect(sock->c_sock, res->ai_addr, res->ai_addrlen)) { - perror("connect"); + assert(sock->status == TCP_ESTABLISHED); + + DBG("sending %zu bytes to client", (size_t)(sz - tcphdr->th_off * 4)); + sz = send(sock->c_sock, rbuf + tcphdr->th_off * 4, sz - tcphdr->th_off * 4, 0); + if (sz < 0) { + perror("send"); ev_break(EV_A_ EVBREAK_ONE); return; } - - freeaddrinfo(res); - - ev_timer_stop(EV_A_ &sock->tm_w); - sock->tm_w.repeat = 60. * 60. * 3.; - ev_timer_start(EV_A_ &sock->tm_w); - - ev_io_init(&sock->io_w, sc_cb, sock->c_sock, EV_READ); - sock->io_w.data = sock; - ev_io_start(EV_A_ &sock->io_w); + return; } - assert(sock->status == TCP_ESTABLISHED); + if (sz == 0) + abort(); - DBG("sending %zu bytes to client", (size_t)(sz - tcphdr->th_off * 4)); - sz = send(sock->c_sock, rbuf + tcphdr->th_off * 4, sz - tcphdr->th_off * 4, 0); - if (sz < 0) { - perror("send"); + if (errno != EINVAL) { + perror("recvfrom"); ev_break(EV_A_ EVBREAK_ONE); - return; } } @@ -317,6 +337,12 @@ int start_server(const char *s_host, const char *s_port, const char *r_host, con return 1; } + if (fcntl(s_data.s_sock, F_SETFL, O_NONBLOCK) == -1) { + perror("fcntl"); + freeaddrinfo(res); + return 1; + } + struct ev_loop *loop = EV_DEFAULT; ev_io s_watcher; -- cgit v1.2.3-54-g00ecf