summaryrefslogtreecommitdiff
path: root/src/client.c
diff options
context:
space:
mode:
authorAlex Xu <alex_y_xu@yahoo.ca>2016-07-03 21:43:13 -0400
committerAlex Xu <alex_y_xu@yahoo.ca>2016-07-03 21:43:13 -0400
commit5834fc2036259c9f9924156638502bbf9aa15f06 (patch)
treea24d3f209ade640eb242fcc32e9103a6c3c4a1c5 /src/client.c
parent132b3161f97c9d3bb08b5f001efc4cae6adc109d (diff)
downloadudpastcp-5834fc2036259c9f9924156638502bbf9aa15f06.tar.xz
udpastcp-5834fc2036259c9f9924156638502bbf9aa15f06.zip
Use non-blocking IO, loop recvfrom.
Diffstat (limited to 'src/client.c')
-rw-r--r--src/client.c356
1 files changed, 184 insertions, 172 deletions
diff --git a/src/client.c b/src/client.c
index c29d733..21f6217 100644
--- a/src/client.c
+++ b/src/client.c
@@ -1,4 +1,6 @@
+#include <errno.h>
#include <ev.h>
+#include <fcntl.h>
#include <limits.h>
#include <netdb.h>
#include <netinet/in.h>
@@ -192,95 +194,95 @@ static void cc_cb(struct ev_loop *loop, ev_io *w, int revents __attribute__((unu
socklen_t pkt_addrlen = sizeof(struct sockaddr_in6);
ssize_t should_ssz, rsz, ssz;
- if ((rsz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&rsock->c_data->pkt_addr, &pkt_addrlen)) == -1) {
- perror("recvfrom");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
+ while ((rsz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&rsock->c_data->pkt_addr, &pkt_addrlen)) != -1) {
+ DBG("received %zd raw bytes on client", rsz);
- DBG("received %zd raw bytes on client", rsz);
+ if (pkt_addrlen > sizeof(struct sockaddr_in6))
+ abort();
- if (pkt_addrlen > sizeof(struct sockaddr_in6))
- abort();
-
- if ((size_t)rsz < sizeof(struct tcphdr))
- return;
+ if ((size_t)rsz < sizeof(struct tcphdr))
+ return;
- struct tcphdr *rhdr = (struct tcphdr *)rbuf;
+ struct tcphdr *rhdr = (struct tcphdr *)rbuf;
- struct o_c_sock *sock;
+ struct o_c_sock *sock;
- HASH_FIND(hh_lp, rsock->o_socks_by_lport, &rhdr->th_dport, sizeof(in_port_t), sock);
+ HASH_FIND(hh_lp, rsock->o_socks_by_lport, &rhdr->th_dport, sizeof(in_port_t), sock);
- if (!sock) {
- DBG("could not find conn with lport %hu", ntohs(rhdr->th_dport));
- return;
- }
+ if (!sock) {
+ DBG("could not find conn with lport %hu", ntohs(rhdr->th_dport));
+ return;
+ }
- if (sock->status == TCP_SYN_SENT && rhdr->th_flags == (TH_SYN | TH_ACK)) {
- DBG("SYN/ACK received, connection established");
+ if (sock->status == TCP_SYN_SENT && rhdr->th_flags == (TH_SYN | TH_ACK)) {
+ DBG("SYN/ACK received, connection established");
- sock->status = TCP_ESTABLISHED;
+ sock->status = TCP_ESTABLISHED;
- struct tcphdr shdr = {
- .th_sport = sock->l_port,
- .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
- .th_seq = htonl(sock->seq_num),
- .th_ack = rhdr->th_seq,
- .th_win = 65535,
- .th_flags = TH_ACK,
- .th_off = 5
- };
+ struct tcphdr shdr = {
+ .th_sport = sock->l_port,
+ .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
+ .th_seq = htonl(sock->seq_num),
+ .th_ack = rhdr->th_seq,
+ .th_win = 65535,
+ .th_flags = TH_ACK,
+ .th_off = 5
+ };
- sock->seq_num += sock->pending_data_size;
+ sock->seq_num += sock->pending_data_size;
- struct iovec iovs[2] = {
- { .iov_base = &shdr, .iov_len = sizeof(shdr) },
- { .iov_base = sock->pending_data, .iov_len = sock->pending_data_size }
- };
+ struct iovec iovs[2] = {
+ { .iov_base = &shdr, .iov_len = sizeof(shdr) },
+ { .iov_base = sock->pending_data, .iov_len = sock->pending_data_size }
+ };
- struct msghdr msghdr = {
- .msg_name = NULL,
- .msg_namelen = 0,
- .msg_iov = iovs,
- .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
- };
+ struct msghdr msghdr = {
+ .msg_name = NULL,
+ .msg_namelen = 0,
+ .msg_iov = iovs,
+ .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
+ };
- should_ssz = sizeof(shdr) + sock->pending_data_size;
- ssz = sendmsg(rsock->fd, &msghdr, 0);
+ should_ssz = sizeof(shdr) + sock->pending_data_size;
+ ssz = sendmsg(rsock->fd, &msghdr, 0);
- if (ssz < 0) {
- perror("sendmsg");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- } else if ((size_t)ssz != should_ssz) {
- fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz);
- }
+ if (ssz < 0) {
+ perror("sendmsg");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ } else if ((size_t)ssz != should_ssz) {
+ fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz);
+ }
- free(sock->pending_data);
+ free(sock->pending_data);
- ev_timer_stop(EV_A_ &sock->tm_w);
- // this delay is not very important because one, it is OK if UDP
- // packets are lost, and two, they are only delayed until a new
- // connection is established. however, it is probably a good idea to
- // set this higher than the UDP ping delay if you are using one.
- ev_timer_init(&sock->tm_w, c_tm_cb, 10. * 60., 10. * 60.);
- ev_timer_start(EV_A_ &sock->tm_w);
- }
+ ev_timer_stop(EV_A_ &sock->tm_w);
+ // this delay is not very important because one, it is OK if UDP
+ // packets are lost, and two, they are only delayed until a new
+ // connection is established. however, it is probably a good idea to
+ // set this higher than the UDP ping delay if you are using one.
+ ev_timer_init(&sock->tm_w, c_tm_cb, 10. * 60., 10. * 60.);
+ ev_timer_start(EV_A_ &sock->tm_w);
+ }
- should_ssz = rsz - rhdr->th_off * 32 / CHAR_BIT;
- if (should_ssz > 0) {
- DBG("sending %zd bytes to client", should_ssz);
- ssz = sendto(rsock->c_data->s_sock, rbuf + rhdr->th_off * 32 / CHAR_BIT, should_ssz, 0, sock->c_address, rsock->c_data->s_addrlen);
+ should_ssz = rsz - rhdr->th_off * 32 / CHAR_BIT;
+ if (should_ssz > 0) {
+ DBG("sending %zd bytes to client", should_ssz);
+ ssz = sendto(rsock->c_data->s_sock, rbuf + rhdr->th_off * 32 / CHAR_BIT, should_ssz, 0, sock->c_address, rsock->c_data->s_addrlen);
- if (ssz < 0) {
- perror("sendto");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- } else if ((size_t)ssz != should_ssz) {
- fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz);
+ if (ssz < 0) {
+ perror("sendto");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ } else if ((size_t)ssz != should_ssz) {
+ fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz);
+ }
}
}
+ if (errno != EAGAIN) {
+ perror("recvfrom");
+ ev_break(EV_A_ EVBREAK_ONE);
+ }
}
#define SIX_OR_FOUR(sa, six, four, neither) \
@@ -311,6 +313,11 @@ static int c_rsock_init(struct o_c_sock *sock, struct addrinfo *res) {
return 0;
}
+ if (fcntl(sock->rsock->fd, F_SETFL, O_NONBLOCK) == -1) {
+ perror("fcntl");
+ return 0;
+ }
+
struct sockaddr_storage our_addr;
socklen_t our_addr_len = sizeof(our_addr);
int r = getsockname(sock->rsock->fd, (struct sockaddr *)&our_addr, &our_addr_len);
@@ -331,136 +338,136 @@ static void cs_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) {
ssize_t sz;
char rbuf[65536];
- if ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&c_data->pkt_addr, &addresslen)) == -1) {
- perror("recvfrom");
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
+ while ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&c_data->pkt_addr, &addresslen)) != -1) {
+ DBG("received %zd bytes on server", sz);
- DBG("received %zd bytes on server", sz);
+ if (addresslen != c_data->s_addrlen)
+ abort();
- if (addresslen != c_data->s_addrlen)
- abort();
+ struct o_c_sock *sock;
+ HASH_FIND(hh_ca, c_data->o_socks_by_caddr, &c_data->pkt_addr, addresslen, sock);
- struct o_c_sock *sock;
- HASH_FIND(hh_ca, c_data->o_socks_by_caddr, &c_data->pkt_addr, addresslen, sock);
-
- if (!sock) {
- DBG("could not locate matching socket for client, initializing new connection");
- sock = calloc(1, sizeof(*sock));
-
- struct addrinfo *res;
- DBG("looking up %s:%s", c_data->r_host, c_data->r_port);
- // TODO: make this asynchronous
- int r = getaddrinfo(c_data->r_host, c_data->r_port, NULL, &res);
- if (r) {
- fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r));
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
+ if (!sock) {
+ DBG("could not locate matching socket for client, initializing new connection");
+ sock = calloc(1, sizeof(*sock));
+
+ struct addrinfo *res;
+ DBG("looking up %s:%s", c_data->r_host, c_data->r_port);
+ // TODO: make this asynchronous
+ int r = getaddrinfo(c_data->r_host, c_data->r_port, NULL, &res);
+ if (r) {
+ fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r));
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
+
+ sock->c_address = malloc(addresslen);
+ memcpy(sock->c_address, &c_data->pkt_addr, addresslen);
- sock->c_address = malloc(addresslen);
- memcpy(sock->c_address, &c_data->pkt_addr, addresslen);
+ HASH_FIND(hh, c_data->o_rsocks, res->ai_addr, res->ai_addrlen, sock->rsock);
- HASH_FIND(hh, c_data->o_rsocks, res->ai_addr, res->ai_addrlen, sock->rsock);
+ if (!sock->rsock) {
+ DBG("could not locate remote socket to host, initializing new raw socket");
+ if (!c_rsock_init(sock, res)) {
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ }
+ sock->rsock->c_data = c_data;
+
+ ev_io_init(&sock->rsock->io_w, cc_cb, sock->rsock->fd, EV_READ);
+ sock->rsock->io_w.data = sock->rsock;
+ ev_io_start(EV_A_ &sock->rsock->io_w);
+
+ HASH_ADD_KEYPTR(hh, c_data->o_rsocks, sock->rsock->r_addr, sock->rsock->r_addrlen, sock->rsock);
+ }
- if (!sock->rsock) {
- DBG("could not locate remote socket to host, initializing new raw socket");
- if (!c_rsock_init(sock, res)) {
+ uint16_t l_port = reserve_port(sock->rsock->used_ports);
+ DBG("using port %hu", l_port);
+ if (!l_port) {
+ fputs("we ran out of ports?\n", stderr);
ev_break(EV_A_ EVBREAK_ONE);
return;
}
- sock->rsock->c_data = c_data;
+ sock->l_port = htons(l_port);
- ev_io_init(&sock->rsock->io_w, cc_cb, sock->rsock->fd, EV_READ);
- sock->rsock->io_w.data = sock->rsock;
- ev_io_start(EV_A_ &sock->rsock->io_w);
+ HASH_ADD_KEYPTR(hh_ca, c_data->o_socks_by_caddr, sock->c_address, addresslen, sock);
+ HASH_ADD(hh_lp, sock->rsock->o_socks_by_lport, l_port, sizeof(in_port_t), sock);
- HASH_ADD_KEYPTR(hh, c_data->o_rsocks, sock->rsock->r_addr, sock->rsock->r_addrlen, sock->rsock);
- }
+ sock->seq_num = random();
- uint16_t l_port = reserve_port(sock->rsock->used_ports);
- DBG("using port %hu", l_port);
- if (!l_port) {
- fputs("we ran out of ports?\n", stderr);
- ev_break(EV_A_ EVBREAK_ONE);
- return;
- }
- sock->l_port = htons(l_port);
+ struct tcphdr buf = {
+ .th_sport = sock->l_port,
+ .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
+ .th_seq = htonl(sock->seq_num++),
+ .th_flags = TH_SYN,
+ .th_off = 5
+ };
+
+ sock->pending_data = malloc(sz);
+ memcpy(sock->pending_data, rbuf, sz);
+ sock->pending_data_size = sz;
+
+ DBG("sending SYN to remote");
+ sz = send(sock->rsock->fd, &buf, sizeof(buf), 0);
+ if (sz < 0) {
+ perror("send");
+ ev_break(EV_A_ EVBREAK_ONE);
+ return;
+ } else if ((size_t)sz != sizeof(buf)) {
+ fprintf(stderr, "send %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz);
+ }
- HASH_ADD_KEYPTR(hh_ca, c_data->o_socks_by_caddr, sock->c_address, addresslen, sock);
- HASH_ADD(hh_lp, sock->rsock->o_socks_by_lport, l_port, sizeof(in_port_t), sock);
+ // resend SYN
- sock->seq_num = random();
+ ev_timer_init(&sock->tm_w, c_syn_tm_cb, 0., tcp_syn_retry_timeouts[0]);
+ sock->tm_w.data = sock;
+ sock->syn_retries = 0;
+ c_adv_syn_tm(EV_A_ sock);
- struct tcphdr buf = {
+ sock->status = TCP_SYN_SENT;
+
+ return;
+ }
+
+ struct tcphdr tcp_hdr = {
.th_sport = sock->l_port,
.th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
- .th_seq = htonl(sock->seq_num++),
- .th_flags = TH_SYN,
- .th_off = 5
+ .th_seq = htonl(sock->seq_num),
+ .th_off = 5,
+ .th_win = 65535,
+ .th_flags = TH_PUSH
+ };
+
+ sock->seq_num += sz;
+
+ struct iovec iovs[2] = {
+ { .iov_base = &tcp_hdr, .iov_len = sizeof(tcp_hdr) },
+ { .iov_base = rbuf, .iov_len = sz }
};
- sock->pending_data = malloc(sz);
- memcpy(sock->pending_data, rbuf, sz);
- sock->pending_data_size = sz;
+ struct msghdr msghdr = {
+ .msg_name = NULL,
+ .msg_namelen = 0,
+ .msg_iov = iovs,
+ .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
+ };
- DBG("sending SYN to remote");
- sz = send(sock->rsock->fd, &buf, sizeof(buf), 0);
+ size_t should_send_size = sizeof(tcp_hdr) + sz;
+ DBG("sending %zd raw bytes containing %zd bytes payload to remote", should_send_size, sz);
+ sz = sendmsg(sock->rsock->fd, &msghdr, 0);
if (sz < 0) {
- perror("send");
+ perror("sendmsg");
ev_break(EV_A_ EVBREAK_ONE);
return;
- } else if ((size_t)sz != sizeof(buf)) {
- fprintf(stderr, "send %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz);
+ } else if ((size_t)sz != should_send_size) {
+ fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz);
}
-
- // resend SYN
-
- ev_timer_init(&sock->tm_w, c_syn_tm_cb, 0., tcp_syn_retry_timeouts[0]);
- sock->tm_w.data = sock;
- sock->syn_retries = 0;
- c_adv_syn_tm(EV_A_ sock);
-
- sock->status = TCP_SYN_SENT;
-
- return;
+ ev_timer_again(EV_A_ &sock->tm_w);
}
-
- struct tcphdr tcp_hdr = {
- .th_sport = sock->l_port,
- .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port,
- .th_seq = htonl(sock->seq_num),
- .th_off = 5,
- .th_win = 65535,
- .th_flags = TH_PUSH
- };
-
- sock->seq_num += sz;
-
- struct iovec iovs[2] = {
- { .iov_base = &tcp_hdr, .iov_len = sizeof(tcp_hdr) },
- { .iov_base = rbuf, .iov_len = sz }
- };
-
- struct msghdr msghdr = {
- .msg_name = NULL,
- .msg_namelen = 0,
- .msg_iov = iovs,
- .msg_iovlen = sizeof(iovs) / sizeof(iovs[0])
- };
-
- size_t should_send_size = sizeof(tcp_hdr) + sz;
- DBG("sending %zd raw bytes containing %zd bytes payload to remote", should_send_size, sz);
- sz = sendmsg(sock->rsock->fd, &msghdr, 0);
- if (sz < 0) {
- perror("sendmsg");
+ if (errno != EAGAIN) {
+ perror("recvfrom");
ev_break(EV_A_ EVBREAK_ONE);
- return;
- } else if ((size_t)sz != should_send_size) {
- fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz);
}
- ev_timer_again(EV_A_ &sock->tm_w);
}
static void c_cleanup() {
@@ -509,6 +516,11 @@ int start_client(const char *s_host, const char *s_port, const char *r_host, con
freeaddrinfo(res);
+ if (fcntl(c_data.s_sock, F_SETFL, O_NONBLOCK) == -1) {
+ perror("fcntl");
+ return 4;
+ }
+
global_c_data = &c_data;
atexit(c_cleanup);