summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Xu <alex_y_xu@yahoo.ca>2016-07-03 21:43:13 -0400
committerAlex Xu <alex_y_xu@yahoo.ca>2016-07-03 21:43:13 -0400
commit5834fc2036259c9f9924156638502bbf9aa15f06 (patch)
treea24d3f209ade640eb242fcc32e9103a6c3c4a1c5 /src
parent132b3161f97c9d3bb08b5f001efc4cae6adc109d (diff)
downloadudpastcp-5834fc2036259c9f9924156638502bbf9aa15f06.tar.xz
udpastcp-5834fc2036259c9f9924156638502bbf9aa15f06.zip
Use non-blocking IO, loop recvfrom.
Diffstat (limited to 'src')
-rw-r--r--src/client.c356
-rw-r--r--src/server.c312
2 files changed, 353 insertions, 315 deletions
diff --git a/src/client.c b/src/client.c
index c29d733..21f6217 100644
--- a/src/client.c
+++ b/src/client.c
@@ -1,4 +1,6 @@
+#include <errno.h>
#include <ev.h>
+#include <fcntl.h>
#include <limits.h>
#include <netdb.h>
#include <netinet/in.h>
@@ -192,95 +194,95 @@ static void cc_cb(struct ev_loop *loop, ev_io *w, int revents __attribute__((unu
socklen_t pkt_addrlen = sizeof(struct sockaddr_in6);
ssize_t should_ssz, rsz, ssz;
- if ((rsz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&rsock->c_data->pkt_addr, &pkt_addrlen)) == -1) {
- perror("recvfrom");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
+ while ((rsz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&rsock->c_data->pkt_addr, &pkt_addrlen)) != -1) {
+ DBG("received %zd raw bytes on client", rsz);
- DBG("received %zd raw bytes on client", rsz);
+ if (pkt_addrlen > sizeof(struct sockaddr_in6))
+ abort();
- if (pkt_addrlen > sizeof(struct sockaddr_in6))
- abort();
-
- if ((size_t)rsz < sizeof(struct tcphdr))
- return;
+ if ((size_t)rsz < sizeof(struct tcphdr))
+ return;
- struct tcphdr *rhdr = (struct tcphdr *)rbuf;
+ struct tcphdr *rhdr = (struct tcphdr *)rbuf;
- struct o_c_sock *sock;
+ struct o_c_sock *sock;
- HASH_FIND(hh_lp, rsock->o_socks_by_lport, &rhdr->th_dport, sizeof(in_port_t), sock);
+ HASH_FIND(hh_lp, rsock->o_socks_by_lport, &rhdr->th_dport, sizeof(in_port_t), sock);
- if (!sock) {
- DBG("could not find conn with lport %hu", ntohs(rhdr->th_dport));
- return;
- }
+ if (!sock) {
+ DBG("could not find conn with lport %hu", ntohs(rhdr->th_dport));
+ return;
+ }
- if (sock->status == TCP_SYN_SENT && rhdr->th_flags == (TH_SYN | TH_ACK)) {
- DBG("SYN/ACK received, connection established");
+ if (sock->status == TCP_SYN_SENT && rhdr->th_flags == (TH_SYN | TH_ACK)) {
+ DBG("SYN/ACK received, connection established");
- sock->status = TCP_ESTABLISHED;
+ sock->status = TCP_ESTABLISHED;
- struct tcphdr shdr = {
- .th_sport = sock->l_port,
- .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
- .th_seq = htonl(sock->seq_num),
- .th_ack = rhdr->th_seq,
- .th_win = 65535,
- .th_flags = TH_ACK,
- .th_off = 5
- };
+ struct tcphdr shdr = {
+ .th_sport = sock->l_port,
+ .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
+ .th_seq = htonl(sock->seq_num),
+ .th_ack = rhdr->th_seq,
+ .th_win = 65535,
+ .th_flags = TH_ACK,
+ .th_off = 5
+ };
- sock->seq_num += sock->pending_data_size;
+ sock->seq_num += sock->pending_data_size;
- struct iovec iovs[2] = {
- { .iov_base = &shdr, .iov_len = sizeof(shdr) },
- { .iov_base = sock->pending_data, .iov_len = sock->pending_data_size }
- };
+ struct iovec iovs[2] = {
+ { .iov_base = &shdr, .iov_len = sizeof(shdr) },
+ { .iov_base = sock->pending_data, .iov_len = sock->pending_data_size }
+ };
- struct msghdr msghdr = {
- .msg_name = NULL,
- .msg_namelen = 0,
- .msg_iov = iovs,
- .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
- };
+ struct msghdr msghdr = {
+ .msg_name = NULL,
+ .msg_namelen = 0,
+ .msg_iov = iovs,
+ .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
+ };
- should_ssz = sizeof(shdr) + sock->pending_data_size;
- ssz = sendmsg(rsock->fd, &msghdr, 0);
+ should_ssz = sizeof(shdr) + sock->pending_data_size;
+ ssz = sendmsg(rsock->fd, &msghdr, 0);
- if (ssz < 0) {
- perror("sendmsg");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- } else if ((size_t)ssz != should_ssz) {
- fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz);
- }
+ if (ssz < 0) {
+ perror("sendmsg");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ } else if ((size_t)ssz != should_ssz) {
+ fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz);
+ }
- free(sock->pending_data);
+ free(sock->pending_data);
- ev_timer_stop(EV_A_ &sock->tm_w);
- // this delay is not very important because one, it is OK if UDP
- // packets are lost, and two, they are only delayed until a new
- // connection is established. however, it is probably a good idea to
- // set this higher than the UDP ping delay if you are using one.
- ev_timer_init(&sock->tm_w, c_tm_cb, 10. * 60., 10. * 60.);
- ev_timer_start(EV_A_ &sock->tm_w);
- }
+ ev_timer_stop(EV_A_ &sock->tm_w);
+ // this delay is not very important because one, it is OK if UDP
+ // packets are lost, and two, they are only delayed until a new
+ // connection is established. however, it is probably a good idea to
+ // set this higher than the UDP ping delay if you are using one.
+ ev_timer_init(&sock->tm_w, c_tm_cb, 10. * 60., 10. * 60.);
+ ev_timer_start(EV_A_ &sock->tm_w);
+ }
- should_ssz = rsz - rhdr->th_off * 32 / CHAR_BIT;
- if (should_ssz > 0) {
- DBG("sending %zd bytes to client", should_ssz);
- ssz = sendto(rsock->c_data->s_sock, rbuf + rhdr->th_off * 32 / CHAR_BIT, should_ssz, 0, sock->c_address, rsock->c_data->s_addrlen);
+ should_ssz = rsz - rhdr->th_off * 32 / CHAR_BIT;
+ if (should_ssz > 0) {
+ DBG("sending %zd bytes to client", should_ssz);
+ ssz = sendto(rsock->c_data->s_sock, rbuf + rhdr->th_off * 32 / CHAR_BIT, should_ssz, 0, sock->c_address, rsock->c_data->s_addrlen);
- if (ssz < 0) {
- perror("sendto");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- } else if ((size_t)ssz != should_ssz) {
- fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz);
+ if (ssz < 0) {
+ perror("sendto");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ } else if ((size_t)ssz != should_ssz) {
+ fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz);
+ }
}
}
+ if (errno != EAGAIN) {
+ perror("recvfrom");
+ ev_break(EV_A_ EVBREAK_ONE);
+ }
}
#define SIX_OR_FOUR(sa, six, four, neither) \
@@ -311,6 +313,11 @@ static int c_rsock_init(struct o_c_sock *sock, struct addrinfo *res) {
return 0;
}
+ if (fcntl(sock->rsock->fd, F_SETFL, O_NONBLOCK) == -1) {
+ perror("fcntl");
+ return 0;
+ }
+
struct sockaddr_storage our_addr;
socklen_t our_addr_len = sizeof(our_addr);
int r = getsockname(sock->rsock->fd, (struct sockaddr *)&our_addr, &our_addr_len);
@@ -331,136 +338,136 @@ static void cs_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) {
ssize_t sz;
char rbuf[65536];
- if ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&c_data->pkt_addr, &addresslen)) == -1) {
- perror("recvfrom");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
+ while ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&c_data->pkt_addr, &addresslen)) != -1) {
+ DBG("received %zd bytes on server", sz);
- DBG("received %zd bytes on server", sz);
+ if (addresslen != c_data->s_addrlen)
+ abort();
- if (addresslen != c_data->s_addrlen)
- abort();
+ struct o_c_sock *sock;
+ HASH_FIND(hh_ca, c_data->o_socks_by_caddr, &c_data->pkt_addr, addresslen, sock);
- struct o_c_sock *sock;
- HASH_FIND(hh_ca, c_data->o_socks_by_caddr, &c_data->pkt_addr, addresslen, sock);
-
- if (!sock) {
- DBG("could not locate matching socket for client, initializing new connection");
- sock = calloc(1, sizeof(*sock));
-
- struct addrinfo *res;
- DBG("looking up %s:%s", c_data->r_host, c_data->r_port);
- // TODO: make this asynchronous
- int r = getaddrinfo(c_data->r_host, c_data->r_port, NULL, &res);
- if (r) {
- fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r));
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
+ if (!sock) {
+ DBG("could not locate matching socket for client, initializing new connection");
+ sock = calloc(1, sizeof(*sock));
+
+ struct addrinfo *res;
+ DBG("looking up %s:%s", c_data->r_host, c_data->r_port);
+ // TODO: make this asynchronous
+ int r = getaddrinfo(c_data->r_host, c_data->r_port, NULL, &res);
+ if (r) {
+ fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r));
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
+
+ sock->c_address = malloc(addresslen);
+ memcpy(sock->c_address, &c_data->pkt_addr, addresslen);
- sock->c_address = malloc(addresslen);
- memcpy(sock->c_address, &c_data->pkt_addr, addresslen);
+ HASH_FIND(hh, c_data->o_rsocks, res->ai_addr, res->ai_addrlen, sock->rsock);
- HASH_FIND(hh, c_data->o_rsocks, res->ai_addr, res->ai_addrlen, sock->rsock);
+ if (!sock->rsock) {
+ DBG("could not locate remote socket to host, initializing new raw socket");
+ if (!c_rsock_init(sock, res)) {
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
+ sock->rsock->c_data = c_data;
+
+ ev_io_init(&sock->rsock->io_w, cc_cb, sock->rsock->fd, EV_READ);
+ sock->rsock->io_w.data = sock->rsock;
+ ev_io_start(EV_A_ &sock->rsock->io_w);
+
+ HASH_ADD_KEYPTR(hh, c_data->o_rsocks, sock->rsock->r_addr, sock->rsock->r_addrlen, sock->rsock);
+ }
- if (!sock->rsock) {
- DBG("could not locate remote socket to host, initializing new raw socket");
- if (!c_rsock_init(sock, res)) {
+ uint16_t l_port = reserve_port(sock->rsock->used_ports);
+ DBG("using port %hu", l_port);
+ if (!l_port) {
+ fputs("we ran out of ports?\n", stderr);
ev_break(EV_A_ EVBREAK_ONE);
return;
}
- sock->rsock->c_data = c_data;
+ sock->l_port = htons(l_port);
- ev_io_init(&sock->rsock->io_w, cc_cb, sock->rsock->fd, EV_READ);
- sock->rsock->io_w.data = sock->rsock;
- ev_io_start(EV_A_ &sock->rsock->io_w);
+ HASH_ADD_KEYPTR(hh_ca, c_data->o_socks_by_caddr, sock->c_address, addresslen, sock);
+ HASH_ADD(hh_lp, sock->rsock->o_socks_by_lport, l_port, sizeof(in_port_t), sock);
- HASH_ADD_KEYPTR(hh, c_data->o_rsocks, sock->rsock->r_addr, sock->rsock->r_addrlen, sock->rsock);
- }
+ sock->seq_num = random();
- uint16_t l_port = reserve_port(sock->rsock->used_ports);
- DBG("using port %hu", l_port);
- if (!l_port) {
- fputs("we ran out of ports?\n", stderr);
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
- sock->l_port = htons(l_port);
+ struct tcphdr buf = {
+ .th_sport = sock->l_port,
+ .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
+ .th_seq = htonl(sock->seq_num++),
+ .th_flags = TH_SYN,
+ .th_off = 5
+ };
+
+ sock->pending_data = malloc(sz);
+ memcpy(sock->pending_data, rbuf, sz);
+ sock->pending_data_size = sz;
+
+ DBG("sending SYN to remote");
+ sz = send(sock->rsock->fd, &buf, sizeof(buf), 0);
+ if (sz < 0) {
+ perror("send");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ } else if ((size_t)sz != sizeof(buf)) {
+ fprintf(stderr, "send %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz);
+ }
- HASH_ADD_KEYPTR(hh_ca, c_data->o_socks_by_caddr, sock->c_address, addresslen, sock);
- HASH_ADD(hh_lp, sock->rsock->o_socks_by_lport, l_port, sizeof(in_port_t), sock);
+ // resend SYN
- sock->seq_num = random();
+ ev_timer_init(&sock->tm_w, c_syn_tm_cb, 0., tcp_syn_retry_timeouts[0]);
+ sock->tm_w.data = sock;
+ sock->syn_retries = 0;
+ c_adv_syn_tm(EV_A_ sock);
- struct tcphdr buf = {
+ sock->status = TCP_SYN_SENT;
+
+ return;
+ }
+
+ struct tcphdr tcp_hdr = {
.th_sport = sock->l_port,
.th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
- .th_seq = htonl(sock->seq_num++),
- .th_flags = TH_SYN,
- .th_off = 5
+ .th_seq = htonl(sock->seq_num),
+ .th_off = 5,
+ .th_win = 65535,
+ .th_flags = TH_PUSH
+ };
+
+ sock->seq_num += sz;
+
+ struct iovec iovs[2] = {
+ { .iov_base = &tcp_hdr, .iov_len = sizeof(tcp_hdr) },
+ { .iov_base = rbuf, .iov_len = sz }
};
- sock->pending_data = malloc(sz);
- memcpy(sock->pending_data, rbuf, sz);
- sock->pending_data_size = sz;
+ struct msghdr msghdr = {
+ .msg_name = NULL,
+ .msg_namelen = 0,
+ .msg_iov = iovs,
+ .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
+ };
- DBG("sending SYN to remote");
- sz = send(sock->rsock->fd, &buf, sizeof(buf), 0);
+ size_t should_send_size = sizeof(tcp_hdr) + sz;
+ DBG("sending %zd raw bytes containing %zd bytes payload to remote", should_send_size, sz);
+ sz = sendmsg(sock->rsock->fd, &msghdr, 0);
if (sz < 0) {
- perror("send");
+ perror("sendmsg");
ev_break(EV_A_ EVBREAK_ONE);
return;
- } else if ((size_t)sz != sizeof(buf)) {
- fprintf(stderr, "send %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz);
+ } else if ((size_t)sz != should_send_size) {
+ fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz);
}
-
- // resend SYN
-
- ev_timer_init(&sock->tm_w, c_syn_tm_cb, 0., tcp_syn_retry_timeouts[0]);
- sock->tm_w.data = sock;
- sock->syn_retries = 0;
- c_adv_syn_tm(EV_A_ sock);
-
- sock->status = TCP_SYN_SENT;
-
- return;
+ ev_timer_again(EV_A_ &sock->tm_w);
}
-
- struct tcphdr tcp_hdr = {
- .th_sport = sock->l_port,
- .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
- .th_seq = htonl(sock->seq_num),
- .th_off = 5,
- .th_win = 65535,
- .th_flags = TH_PUSH
- };
-
- sock->seq_num += sz;
-
- struct iovec iovs[2] = {
- { .iov_base = &tcp_hdr, .iov_len = sizeof(tcp_hdr) },
- { .iov_base = rbuf, .iov_len = sz }
- };
-
- struct msghdr msghdr = {
- .msg_name = NULL,
- .msg_namelen = 0,
- .msg_iov = iovs,
- .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
- };
-
- size_t should_send_size = sizeof(tcp_hdr) + sz;
- DBG("sending %zd raw bytes containing %zd bytes payload to remote", should_send_size, sz);
- sz = sendmsg(sock->rsock->fd, &msghdr, 0);
- if (sz < 0) {
- perror("sendmsg");
+ if (errno != EAGAIN) {
+ perror("recvfrom");
ev_break(EV_A_ EVBREAK_ONE);
- return;
- } else if ((size_t)sz != should_send_size) {
- fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz);
}
- ev_timer_again(EV_A_ &sock->tm_w);
}
static void c_cleanup() {
@@ -509,6 +516,11 @@ int start_client(const char *s_host, const char *s_port, const char *r_host, con
freeaddrinfo(res);
+ if (fcntl(c_data.s_sock, F_SETFL, O_NONBLOCK) == -1) {
+ perror("fcntl");
+ return 4;
+ }
+
global_c_data = &c_data;
atexit(c_cleanup);
diff --git a/src/server.c b/src/server.c
index 158c0af..462212f 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1,5 +1,7 @@
#include <assert.h>
+#include <errno.h>
#include <ev.h>
+#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
@@ -84,51 +86,58 @@ static void sc_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) {
DBG("-- entering sc_cb --");
- if ((sz = recv(w->fd, rbuf, sizeof(rbuf), 0)) < 0) {
- perror("recv");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
+ while ((sz = recv(w->fd, rbuf, sizeof(rbuf), 0)) > 0) {
+ DBG("received %zd bytes matching socket %p", sz, sock);
- DBG("received %zd bytes matching socket %p", sz, sock);
+ struct tcphdr hdr;
+ s_prep_c_addr(sock, &hdr);
+ hdr.th_off = 5;
- struct tcphdr hdr;
- s_prep_c_addr(sock, &hdr);
- hdr.th_off = 5;
+ struct iovec iovs[2] = {
+ { .iov_base = &hdr, .iov_len = sizeof(hdr) },
+ { .iov_base = rbuf, .iov_len = sz }
+ };
- struct iovec iovs[2] = {
- { .iov_base = &hdr, .iov_len = sizeof(hdr) },
- { .iov_base = rbuf, .iov_len = sz }
- };
+ in_port_t c_port = ((struct sockaddr_in *)&sock->c_addr)->sin_port;
+ ((struct sockaddr_in *)&sock->c_addr)->sin_port = 0;
- in_port_t c_port = ((struct sockaddr_in *)&sock->c_addr)->sin_port;
- ((struct sockaddr_in *)&sock->c_addr)->sin_port = 0;
+ struct msghdr msghdr = {
+ .msg_name = &sock->c_addr,
+ .msg_namelen = sock->s_data->s_addrlen,
+ .msg_iov = iovs,
+ .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
+ };
- struct msghdr msghdr = {
- .msg_name = &sock->c_addr,
- .msg_namelen = sock->s_data->s_addrlen,
- .msg_iov = iovs,
- .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
- };
+ size_t should_send_size = sizeof(hdr) + sz;
- size_t should_send_size = sizeof(hdr) + sz;
+ assert(sock->status == TCP_ESTABLISHED);
- assert(sock->status == TCP_ESTABLISHED);
+ DBG("sending %zd bytes to client", should_send_size);
+ sz = sendmsg(sock->s_data->s_sock, &msghdr, 0);
- DBG("sending %zd bytes to client", should_send_size);
- sz = sendmsg(sock->s_data->s_sock, &msghdr, 0);
+ ((struct sockaddr_in *)&sock->c_addr)->sin_port = c_port;
- ((struct sockaddr_in *)&sock->c_addr)->sin_port = c_port;
+ if (sz < 0) {
+ perror("sendmsg");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ } else if ((size_t)sz != should_send_size) {
+ fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz);
+ }
+
+ ev_timer_again(EV_A_ &sock->tm_w);
- if (sz < 0) {
- perror("sendmsg");
- ev_break(EV_A_ EVBREAK_ONE);
return;
- } else if ((size_t)sz != should_send_size) {
- fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz);
}
- ev_timer_again(EV_A_ &sock->tm_w);
+ if (sz == 0)
+ abort();
+
+ if (errno != EAGAIN) {
+ perror("recv");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
}
static void ss_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) {
@@ -140,158 +149,169 @@ static void ss_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) {
DBG("-- entering ss_cb --");
- if ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&s_data->pkt_addr, &c_addrlen)) < 0) {
- perror("recvfrom");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
-
- if (c_addrlen != s_data->s_addrlen)
- abort();
+ while ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&s_data->pkt_addr, &c_addrlen)) > 0) {
+ if (c_addrlen != s_data->s_addrlen)
+ abort();
#ifdef DEBUG
- char hbuf[NI_MAXHOST];
- r = getnameinfo((struct sockaddr *)&s_data->pkt_addr, c_addrlen, hbuf, sizeof(hbuf), NULL, 0, NI_NUMERICHOST);
- if (r) {
- fprintf(stderr, "getnameinfo: %s\n", gai_strerror(r));
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
- DBG("received %zd bytes from %s", sz, hbuf);
+ char hbuf[NI_MAXHOST];
+ r = getnameinfo((struct sockaddr *)&s_data->pkt_addr, c_addrlen, hbuf, sizeof(hbuf), NULL, 0, NI_NUMERICHOST);
+ if (r) {
+ fprintf(stderr, "getnameinfo: %s\n", gai_strerror(r));
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
+ DBG("received %zd bytes from %s", sz, hbuf);
#endif
- if ((size_t)sz < sizeof(struct tcphdr)) {
- DBG("packet is smaller than TCP header, ignoring");
- return;
- }
+ if ((size_t)sz < sizeof(struct tcphdr)) {
+ DBG("packet is smaller than TCP header, ignoring");
+ return;
+ }
- struct tcphdr *tcphdr = (struct tcphdr *)rbuf;
+ struct tcphdr *tcphdr = (struct tcphdr *)rbuf;
- DBG("packet received on port %hu", ntohs(tcphdr->th_dport));
+ DBG("packet received on port %hu", ntohs(tcphdr->th_dport));
- if (tcphdr->th_dport != ((struct sockaddr_in *)s_data->s_addr)->sin_port) {
- DBG("packet should be on port %hu, ignoring", ntohs(((struct sockaddr_in *)s_data->s_addr)->sin_port));
- return;
- }
+ if (tcphdr->th_dport != ((struct sockaddr_in *)s_data->s_addr)->sin_port) {
+ DBG("packet should be on port %hu, ignoring", ntohs(((struct sockaddr_in *)s_data->s_addr)->sin_port));
+ return;
+ }
- struct o_s_sock *sock;
+ struct o_s_sock *sock;
- const uint8_t th_flags = tcphdr->th_flags;
+ const uint8_t th_flags = tcphdr->th_flags;
- ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = tcphdr->th_sport;
+ ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = tcphdr->th_sport;
- HASH_FIND(hh, s_data->o_socks_by_caddr, &s_data->pkt_addr, c_addrlen, sock);
+ HASH_FIND(hh, s_data->o_socks_by_caddr, &s_data->pkt_addr, c_addrlen, sock);
- if (!sock) {
- DBG("could not locate matching socket for client addr");
+ if (!sock) {
+ DBG("could not locate matching socket for client addr");
- if (th_flags == TH_SYN) {
- sock = malloc(sizeof(*sock));
+ if (th_flags == TH_SYN) {
+ sock = malloc(sizeof(*sock));
- DBG("packet was SYN, initializing new connection @ %p", sock);
+ DBG("packet was SYN, initializing new connection @ %p", sock);
- memcpy(&sock->c_addr, &s_data->pkt_addr, c_addrlen);
+ memcpy(&sock->c_addr, &s_data->pkt_addr, c_addrlen);
- sock->s_data = s_data;
- sock->seq_num = random();
- sock->c_sock = -1;
- sock->status = TCP_SYN_RECV;
+ sock->s_data = s_data;
+ sock->seq_num = random();
+ sock->c_sock = -1;
+ sock->status = TCP_SYN_RECV;
- struct tcphdr buf = {
- .th_sport = tcphdr->th_dport,
- .th_dport = tcphdr->th_sport,
- .th_seq = htonl(sock->seq_num),
- .th_ack = tcphdr->th_seq,
- .th_flags = TH_SYN | TH_ACK,
- .th_off = 5
- };
+ struct tcphdr buf = {
+ .th_sport = tcphdr->th_dport,
+ .th_dport = tcphdr->th_sport,
+ .th_seq = htonl(sock->seq_num),
+ .th_ack = tcphdr->th_seq,
+ .th_flags = TH_SYN | TH_ACK,
+ .th_off = 5
+ };
- HASH_ADD(hh, s_data->o_socks_by_caddr, c_addr, c_addrlen, sock);
+ HASH_ADD(hh, s_data->o_socks_by_caddr, c_addr, c_addrlen, sock);
- ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = htons(0);
+ ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = htons(0);
- DBG("sending SYN/ACK");
- if ((sz = sendto(w->fd, &buf, sizeof(buf), 0, (struct sockaddr *)&s_data->pkt_addr, s_data->s_addrlen)) == -1) {
- perror("sendto");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- } else if (sz != sizeof(buf)) {
- fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz);
+ DBG("sending SYN/ACK");
+ if ((sz = sendto(w->fd, &buf, sizeof(buf), 0, (struct sockaddr *)&s_data->pkt_addr, s_data->s_addrlen)) == -1) {
+ perror("sendto");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ } else if (sz != sizeof(buf)) {
+ fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz);
+ }
+
+ ev_init(&sock->tm_w, s_tm_cb);
+ sock->tm_w.repeat = 10. * 60.;
+ sock->tm_w.data = sock;
+ ev_timer_again(EV_A_ &sock->tm_w);
+ } else {
+ DBG("packet was not SYN, ignoring");
}
- ev_init(&sock->tm_w, s_tm_cb);
- sock->tm_w.repeat = 10. * 60.;
- sock->tm_w.data = sock;
- ev_timer_again(EV_A_ &sock->tm_w);
- } else {
- DBG("packet was not SYN, ignoring");
+ return;
}
- return;
- }
+ if (tcphdr->th_off != 5) {
+ DBG("TCP options were specified, dropping packet");
+ return;
+ }
- if (tcphdr->th_off != 5) {
- DBG("TCP options were specified, dropping packet");
- return;
- }
+ if (th_flags == TH_RST) {
+ DBG("RST received, cleaning up socket");
+ sock->status = TCP_CLOSE;
+ s_sock_cleanup(EV_A_ sock);
+ }
- if (th_flags == TH_RST) {
- DBG("RST received, cleaning up socket");
- sock->status = TCP_CLOSE;
- s_sock_cleanup(EV_A_ sock);
- }
+ if (th_flags & ~(TH_PUSH | TH_ACK)) {
+ DBG("TCP flags not PSH and/or ACK, dropping packet");
+ return;
+ }
- if (th_flags & ~(TH_PUSH | TH_ACK)) {
- DBG("TCP flags not PSH and/or ACK, dropping packet");
- return;
- }
+ if (sock->status == TCP_SYN_RECV) {
+ assert(sock->c_sock == -1);
- if (sock->status == TCP_SYN_RECV) {
- assert(sock->c_sock == -1);
+ DBG("no UDP socket for this connection, shifting to ESTABLISHED");
- DBG("no UDP socket for this connection, shifting to ESTABLISHED");
+ sock->status = TCP_ESTABLISHED;
- sock->status = TCP_ESTABLISHED;
+ struct addrinfo *res;
+ r = getaddrinfo(s_data->r_host, s_data->r_port, NULL, &res);
+ if (r) {
+ fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r));
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
- struct addrinfo *res;
- r = getaddrinfo(s_data->r_host, s_data->r_port, NULL, &res);
- if (r) {
- fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r));
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
+ if ((sock->c_sock = socket(s_data->s_addr->sa_family, SOCK_DGRAM, 0)) == -1) {
+ perror("socket");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
- if ((sock->c_sock = socket(s_data->s_addr->sa_family, SOCK_DGRAM, 0)) == -1) {
- perror("socket");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
+ if (connect(sock->c_sock, res->ai_addr, res->ai_addrlen)) {
+ perror("connect");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
+
+ if (fcntl(sock->c_sock, F_SETFL, O_NONBLOCK) == -1) {
+ perror("fcntl");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
+
+ freeaddrinfo(res);
+
+ ev_timer_stop(EV_A_ &sock->tm_w);
+ sock->tm_w.repeat = 60. * 60. * 3.;
+ ev_timer_start(EV_A_ &sock->tm_w);
+
+ ev_io_init(&sock->io_w, sc_cb, sock->c_sock, EV_READ);
+ sock->io_w.data = sock;
+ ev_io_start(EV_A_ &sock->io_w);
}
- if (connect(sock->c_sock, res->ai_addr, res->ai_addrlen)) {
- perror("connect");
+ assert(sock->status == TCP_ESTABLISHED);
+
+ DBG("sending %zu bytes to client", (size_t)(sz - tcphdr->th_off * 4));
+ sz = send(sock->c_sock, rbuf + tcphdr->th_off * 4, sz - tcphdr->th_off * 4, 0);
+ if (sz < 0) {
+ perror("send");
ev_break(EV_A_ EVBREAK_ONE);
return;
}
-
- freeaddrinfo(res);
-
- ev_timer_stop(EV_A_ &sock->tm_w);
- sock->tm_w.repeat = 60. * 60. * 3.;
- ev_timer_start(EV_A_ &sock->tm_w);
-
- ev_io_init(&sock->io_w, sc_cb, sock->c_sock, EV_READ);
- sock->io_w.data = sock;
- ev_io_start(EV_A_ &sock->io_w);
+ return;
}
- assert(sock->status == TCP_ESTABLISHED);
+ if (sz == 0)
+ abort();
- DBG("sending %zu bytes to client", (size_t)(sz - tcphdr->th_off * 4));
- sz = send(sock->c_sock, rbuf + tcphdr->th_off * 4, sz - tcphdr->th_off * 4, 0);
- if (sz < 0) {
- perror("send");
+ if (errno != EINVAL) {
+ perror("recvfrom");
ev_break(EV_A_ EVBREAK_ONE);
- return;
}
}
@@ -317,6 +337,12 @@ int start_server(const char *s_host, const char *s_port, const char *r_host, con
return 1;
}
+ if (fcntl(s_data.s_sock, F_SETFL, O_NONBLOCK) == -1) {
+ perror("fcntl");
+ freeaddrinfo(res);
+ return 1;
+ }
+
struct ev_loop *loop = EV_DEFAULT;
ev_io s_watcher;