diff options
author | Alex Xu <alex_y_xu@yahoo.ca> | 2016-07-03 21:43:13 -0400 |
---|---|---|
committer | Alex Xu <alex_y_xu@yahoo.ca> | 2016-07-03 21:43:13 -0400 |
commit | 5834fc2036259c9f9924156638502bbf9aa15f06 (patch) | |
tree | a24d3f209ade640eb242fcc32e9103a6c3c4a1c5 /src/client.c | |
parent | 132b3161f97c9d3bb08b5f001efc4cae6adc109d (diff) | |
download | udpastcp-5834fc2036259c9f9924156638502bbf9aa15f06.tar.xz udpastcp-5834fc2036259c9f9924156638502bbf9aa15f06.zip |
Use non-blocking IO, loop recvfrom.
Diffstat (limited to 'src/client.c')
-rw-r--r-- | src/client.c | 356 |
1 files changed, 184 insertions, 172 deletions
diff --git a/src/client.c b/src/client.c index c29d733..21f6217 100644 --- a/src/client.c +++ b/src/client.c @@ -1,4 +1,6 @@ +#include <errno.h> #include <ev.h> +#include <fcntl.h> #include <limits.h> #include <netdb.h> #include <netinet/in.h> @@ -192,95 +194,95 @@ static void cc_cb(struct ev_loop *loop, ev_io *w, int revents __attribute__((unu socklen_t pkt_addrlen = sizeof(struct sockaddr_in6); ssize_t should_ssz, rsz, ssz; - if ((rsz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&rsock->c_data->pkt_addr, &pkt_addrlen)) == -1) { - perror("recvfrom"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + while ((rsz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&rsock->c_data->pkt_addr, &pkt_addrlen)) != -1) { + DBG("received %zd raw bytes on client", rsz); - DBG("received %zd raw bytes on client", rsz); + if (pkt_addrlen > sizeof(struct sockaddr_in6)) + abort(); - if (pkt_addrlen > sizeof(struct sockaddr_in6)) - abort(); - - if ((size_t)rsz < sizeof(struct tcphdr)) - return; + if ((size_t)rsz < sizeof(struct tcphdr)) + return; - struct tcphdr *rhdr = (struct tcphdr *)rbuf; + struct tcphdr *rhdr = (struct tcphdr *)rbuf; - struct o_c_sock *sock; + struct o_c_sock *sock; - HASH_FIND(hh_lp, rsock->o_socks_by_lport, &rhdr->th_dport, sizeof(in_port_t), sock); + HASH_FIND(hh_lp, rsock->o_socks_by_lport, &rhdr->th_dport, sizeof(in_port_t), sock); - if (!sock) { - DBG("could not find conn with lport %hu", ntohs(rhdr->th_dport)); - return; - } + if (!sock) { + DBG("could not find conn with lport %hu", ntohs(rhdr->th_dport)); + return; + } - if (sock->status == TCP_SYN_SENT && rhdr->th_flags == (TH_SYN | TH_ACK)) { - DBG("SYN/ACK received, connection established"); + if (sock->status == TCP_SYN_SENT && rhdr->th_flags == (TH_SYN | TH_ACK)) { + DBG("SYN/ACK received, connection established"); - sock->status = TCP_ESTABLISHED; + sock->status = TCP_ESTABLISHED; - struct tcphdr shdr = { - .th_sport = sock->l_port, - .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, - .th_seq = htonl(sock->seq_num), - .th_ack = rhdr->th_seq, - .th_win = 65535, - .th_flags = TH_ACK, - .th_off = 5 - }; + struct tcphdr shdr = { + .th_sport = sock->l_port, + .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, + .th_seq = htonl(sock->seq_num), + .th_ack = rhdr->th_seq, + .th_win = 65535, + .th_flags = TH_ACK, + .th_off = 5 + }; - sock->seq_num += sock->pending_data_size; + sock->seq_num += sock->pending_data_size; - struct iovec iovs[2] = { - { .iov_base = &shdr, .iov_len = sizeof(shdr) }, - { .iov_base = sock->pending_data, .iov_len = sock->pending_data_size } - }; + struct iovec iovs[2] = { + { .iov_base = &shdr, .iov_len = sizeof(shdr) }, + { .iov_base = sock->pending_data, .iov_len = sock->pending_data_size } + }; - struct msghdr msghdr = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = iovs, - .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) - }; + struct msghdr msghdr = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = iovs, + .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) + }; - should_ssz = sizeof(shdr) + sock->pending_data_size; - ssz = sendmsg(rsock->fd, &msghdr, 0); + should_ssz = sizeof(shdr) + sock->pending_data_size; + ssz = sendmsg(rsock->fd, &msghdr, 0); - if (ssz < 0) { - perror("sendmsg"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } else if ((size_t)ssz != should_ssz) { - fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz); - } + if (ssz < 0) { + perror("sendmsg"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if ((size_t)ssz != should_ssz) { + fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz); + } - free(sock->pending_data); + free(sock->pending_data); - ev_timer_stop(EV_A_ &sock->tm_w); - // this delay is not very important because one, it is OK if UDP - // packets are lost, and two, they are only delayed until a new - // connection is established. however, it is probably a good idea to - // set this higher than the UDP ping delay if you are using one. - ev_timer_init(&sock->tm_w, c_tm_cb, 10. * 60., 10. * 60.); - ev_timer_start(EV_A_ &sock->tm_w); - } + ev_timer_stop(EV_A_ &sock->tm_w); + // this delay is not very important because one, it is OK if UDP + // packets are lost, and two, they are only delayed until a new + // connection is established. however, it is probably a good idea to + // set this higher than the UDP ping delay if you are using one. + ev_timer_init(&sock->tm_w, c_tm_cb, 10. * 60., 10. * 60.); + ev_timer_start(EV_A_ &sock->tm_w); + } - should_ssz = rsz - rhdr->th_off * 32 / CHAR_BIT; - if (should_ssz > 0) { - DBG("sending %zd bytes to client", should_ssz); - ssz = sendto(rsock->c_data->s_sock, rbuf + rhdr->th_off * 32 / CHAR_BIT, should_ssz, 0, sock->c_address, rsock->c_data->s_addrlen); + should_ssz = rsz - rhdr->th_off * 32 / CHAR_BIT; + if (should_ssz > 0) { + DBG("sending %zd bytes to client", should_ssz); + ssz = sendto(rsock->c_data->s_sock, rbuf + rhdr->th_off * 32 / CHAR_BIT, should_ssz, 0, sock->c_address, rsock->c_data->s_addrlen); - if (ssz < 0) { - perror("sendto"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } else if ((size_t)ssz != should_ssz) { - fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz); + if (ssz < 0) { + perror("sendto"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if ((size_t)ssz != should_ssz) { + fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)ssz > should_ssz ? "expanded" : "truncated", should_ssz, ssz); + } } } + if (errno != EAGAIN) { + perror("recvfrom"); + ev_break(EV_A_ EVBREAK_ONE); + } } #define SIX_OR_FOUR(sa, six, four, neither) \ @@ -311,6 +313,11 @@ static int c_rsock_init(struct o_c_sock *sock, struct addrinfo *res) { return 0; } + if (fcntl(sock->rsock->fd, F_SETFL, O_NONBLOCK) == -1) { + perror("fcntl"); + return 0; + } + struct sockaddr_storage our_addr; socklen_t our_addr_len = sizeof(our_addr); int r = getsockname(sock->rsock->fd, (struct sockaddr *)&our_addr, &our_addr_len); @@ -331,136 +338,136 @@ static void cs_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) { ssize_t sz; char rbuf[65536]; - if ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&c_data->pkt_addr, &addresslen)) == -1) { - perror("recvfrom"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + while ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&c_data->pkt_addr, &addresslen)) != -1) { + DBG("received %zd bytes on server", sz); - DBG("received %zd bytes on server", sz); + if (addresslen != c_data->s_addrlen) + abort(); - if (addresslen != c_data->s_addrlen) - abort(); + struct o_c_sock *sock; + HASH_FIND(hh_ca, c_data->o_socks_by_caddr, &c_data->pkt_addr, addresslen, sock); - struct o_c_sock *sock; - HASH_FIND(hh_ca, c_data->o_socks_by_caddr, &c_data->pkt_addr, addresslen, sock); - - if (!sock) { - DBG("could not locate matching socket for client, initializing new connection"); - sock = calloc(1, sizeof(*sock)); - - struct addrinfo *res; - DBG("looking up %s:%s", c_data->r_host, c_data->r_port); - // TODO: make this asynchronous - int r = getaddrinfo(c_data->r_host, c_data->r_port, NULL, &res); - if (r) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r)); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + if (!sock) { + DBG("could not locate matching socket for client, initializing new connection"); + sock = calloc(1, sizeof(*sock)); + + struct addrinfo *res; + DBG("looking up %s:%s", c_data->r_host, c_data->r_port); + // TODO: make this asynchronous + int r = getaddrinfo(c_data->r_host, c_data->r_port, NULL, &res); + if (r) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r)); + ev_break(EV_A_ EVBREAK_ONE); + return; + } + + sock->c_address = malloc(addresslen); + memcpy(sock->c_address, &c_data->pkt_addr, addresslen); - sock->c_address = malloc(addresslen); - memcpy(sock->c_address, &c_data->pkt_addr, addresslen); + HASH_FIND(hh, c_data->o_rsocks, res->ai_addr, res->ai_addrlen, sock->rsock); - HASH_FIND(hh, c_data->o_rsocks, res->ai_addr, res->ai_addrlen, sock->rsock); + if (!sock->rsock) { + DBG("could not locate remote socket to host, initializing new raw socket"); + if (!c_rsock_init(sock, res)) { + ev_break(EV_A_ EVBREAK_ONE); + return; + } + sock->rsock->c_data = c_data; + + ev_io_init(&sock->rsock->io_w, cc_cb, sock->rsock->fd, EV_READ); + sock->rsock->io_w.data = sock->rsock; + ev_io_start(EV_A_ &sock->rsock->io_w); + + HASH_ADD_KEYPTR(hh, c_data->o_rsocks, sock->rsock->r_addr, sock->rsock->r_addrlen, sock->rsock); + } - if (!sock->rsock) { - DBG("could not locate remote socket to host, initializing new raw socket"); - if (!c_rsock_init(sock, res)) { + uint16_t l_port = reserve_port(sock->rsock->used_ports); + DBG("using port %hu", l_port); + if (!l_port) { + fputs("we ran out of ports?\n", stderr); ev_break(EV_A_ EVBREAK_ONE); return; } - sock->rsock->c_data = c_data; + sock->l_port = htons(l_port); - ev_io_init(&sock->rsock->io_w, cc_cb, sock->rsock->fd, EV_READ); - sock->rsock->io_w.data = sock->rsock; - ev_io_start(EV_A_ &sock->rsock->io_w); + HASH_ADD_KEYPTR(hh_ca, c_data->o_socks_by_caddr, sock->c_address, addresslen, sock); + HASH_ADD(hh_lp, sock->rsock->o_socks_by_lport, l_port, sizeof(in_port_t), sock); - HASH_ADD_KEYPTR(hh, c_data->o_rsocks, sock->rsock->r_addr, sock->rsock->r_addrlen, sock->rsock); - } + sock->seq_num = random(); - uint16_t l_port = reserve_port(sock->rsock->used_ports); - DBG("using port %hu", l_port); - if (!l_port) { - fputs("we ran out of ports?\n", stderr); - ev_break(EV_A_ EVBREAK_ONE); - return; - } - sock->l_port = htons(l_port); + struct tcphdr buf = { + .th_sport = sock->l_port, + .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, + .th_seq = htonl(sock->seq_num++), + .th_flags = TH_SYN, + .th_off = 5 + }; + + sock->pending_data = malloc(sz); + memcpy(sock->pending_data, rbuf, sz); + sock->pending_data_size = sz; + + DBG("sending SYN to remote"); + sz = send(sock->rsock->fd, &buf, sizeof(buf), 0); + if (sz < 0) { + perror("send"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if ((size_t)sz != sizeof(buf)) { + fprintf(stderr, "send %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz); + } - HASH_ADD_KEYPTR(hh_ca, c_data->o_socks_by_caddr, sock->c_address, addresslen, sock); - HASH_ADD(hh_lp, sock->rsock->o_socks_by_lport, l_port, sizeof(in_port_t), sock); + // resend SYN - sock->seq_num = random(); + ev_timer_init(&sock->tm_w, c_syn_tm_cb, 0., tcp_syn_retry_timeouts[0]); + sock->tm_w.data = sock; + sock->syn_retries = 0; + c_adv_syn_tm(EV_A_ sock); - struct tcphdr buf = { + sock->status = TCP_SYN_SENT; + + return; + } + + struct tcphdr tcp_hdr = { .th_sport = sock->l_port, .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, - .th_seq = htonl(sock->seq_num++), - .th_flags = TH_SYN, - .th_off = 5 + .th_seq = htonl(sock->seq_num), + .th_off = 5, + .th_win = 65535, + .th_flags = TH_PUSH + }; + + sock->seq_num += sz; + + struct iovec iovs[2] = { + { .iov_base = &tcp_hdr, .iov_len = sizeof(tcp_hdr) }, + { .iov_base = rbuf, .iov_len = sz } }; - sock->pending_data = malloc(sz); - memcpy(sock->pending_data, rbuf, sz); - sock->pending_data_size = sz; + struct msghdr msghdr = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = iovs, + .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) + }; - DBG("sending SYN to remote"); - sz = send(sock->rsock->fd, &buf, sizeof(buf), 0); + size_t should_send_size = sizeof(tcp_hdr) + sz; + DBG("sending %zd raw bytes containing %zd bytes payload to remote", should_send_size, sz); + sz = sendmsg(sock->rsock->fd, &msghdr, 0); if (sz < 0) { - perror("send"); + perror("sendmsg"); ev_break(EV_A_ EVBREAK_ONE); return; - } else if ((size_t)sz != sizeof(buf)) { - fprintf(stderr, "send %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz); + } else if ((size_t)sz != should_send_size) { + fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz); } - - // resend SYN - - ev_timer_init(&sock->tm_w, c_syn_tm_cb, 0., tcp_syn_retry_timeouts[0]); - sock->tm_w.data = sock; - sock->syn_retries = 0; - c_adv_syn_tm(EV_A_ sock); - - sock->status = TCP_SYN_SENT; - - return; + ev_timer_again(EV_A_ &sock->tm_w); } - - struct tcphdr tcp_hdr = { - .th_sport = sock->l_port, - .th_dport = ((struct sockaddr_in *)sock->rsock->r_addr)->sin_port, - .th_seq = htonl(sock->seq_num), - .th_off = 5, - .th_win = 65535, - .th_flags = TH_PUSH - }; - - sock->seq_num += sz; - - struct iovec iovs[2] = { - { .iov_base = &tcp_hdr, .iov_len = sizeof(tcp_hdr) }, - { .iov_base = rbuf, .iov_len = sz } - }; - - struct msghdr msghdr = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = iovs, - .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) - }; - - size_t should_send_size = sizeof(tcp_hdr) + sz; - DBG("sending %zd raw bytes containing %zd bytes payload to remote", should_send_size, sz); - sz = sendmsg(sock->rsock->fd, &msghdr, 0); - if (sz < 0) { - perror("sendmsg"); + if (errno != EAGAIN) { + perror("recvfrom"); ev_break(EV_A_ EVBREAK_ONE); - return; - } else if ((size_t)sz != should_send_size) { - fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz); } - ev_timer_again(EV_A_ &sock->tm_w); } static void c_cleanup() { @@ -509,6 +516,11 @@ int start_client(const char *s_host, const char *s_port, const char *r_host, con freeaddrinfo(res); + if (fcntl(c_data.s_sock, F_SETFL, O_NONBLOCK) == -1) { + perror("fcntl"); + return 4; + } + global_c_data = &c_data; atexit(c_cleanup); |