From 5834fc2036259c9f9924156638502bbf9aa15f06 Mon Sep 17 00:00:00 2001 From: Alex Xu Date: Sun, 3 Jul 2016 21:43:13 -0400 Subject: Use non-blocking IO, loop recvfrom. --- src/server.c | 312 ++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 169 insertions(+), 143 deletions(-) (limited to 'src/server.c') diff --git a/src/server.c b/src/server.c index 158c0af..462212f 100644 --- a/src/server.c +++ b/src/server.c @@ -1,5 +1,7 @@ #include +#include #include +#include #include #include #include @@ -84,51 +86,58 @@ static void sc_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) { DBG("-- entering sc_cb --"); - if ((sz = recv(w->fd, rbuf, sizeof(rbuf), 0)) < 0) { - perror("recv"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + while ((sz = recv(w->fd, rbuf, sizeof(rbuf), 0)) > 0) { + DBG("received %zd bytes matching socket %p", sz, sock); - DBG("received %zd bytes matching socket %p", sz, sock); + struct tcphdr hdr; + s_prep_c_addr(sock, &hdr); + hdr.th_off = 5; - struct tcphdr hdr; - s_prep_c_addr(sock, &hdr); - hdr.th_off = 5; + struct iovec iovs[2] = { + { .iov_base = &hdr, .iov_len = sizeof(hdr) }, + { .iov_base = rbuf, .iov_len = sz } + }; - struct iovec iovs[2] = { - { .iov_base = &hdr, .iov_len = sizeof(hdr) }, - { .iov_base = rbuf, .iov_len = sz } - }; + in_port_t c_port = ((struct sockaddr_in *)&sock->c_addr)->sin_port; + ((struct sockaddr_in *)&sock->c_addr)->sin_port = 0; - in_port_t c_port = ((struct sockaddr_in *)&sock->c_addr)->sin_port; - ((struct sockaddr_in *)&sock->c_addr)->sin_port = 0; + struct msghdr msghdr = { + .msg_name = &sock->c_addr, + .msg_namelen = sock->s_data->s_addrlen, + .msg_iov = iovs, + .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) + }; - struct msghdr msghdr = { - .msg_name = &sock->c_addr, - .msg_namelen = sock->s_data->s_addrlen, - .msg_iov = iovs, - .msg_iovlen = sizeof(iovs) / sizeof(iovs[0]) - }; + size_t should_send_size = sizeof(hdr) + sz; - size_t should_send_size = sizeof(hdr) + sz; + assert(sock->status == TCP_ESTABLISHED); - assert(sock->status == TCP_ESTABLISHED); + DBG("sending %zd bytes to client", should_send_size); + sz = sendmsg(sock->s_data->s_sock, &msghdr, 0); - DBG("sending %zd bytes to client", should_send_size); - sz = sendmsg(sock->s_data->s_sock, &msghdr, 0); + ((struct sockaddr_in *)&sock->c_addr)->sin_port = c_port; - ((struct sockaddr_in *)&sock->c_addr)->sin_port = c_port; + if (sz < 0) { + perror("sendmsg"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if ((size_t)sz != should_send_size) { + fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz); + } + + ev_timer_again(EV_A_ &sock->tm_w); - if (sz < 0) { - perror("sendmsg"); - ev_break(EV_A_ EVBREAK_ONE); return; - } else if ((size_t)sz != should_send_size) { - fprintf(stderr, "sendmsg %s our packet: tried %lu, sent %zd\n", (size_t)sz > should_send_size ? "expanded" : "truncated", should_send_size, sz); } - ev_timer_again(EV_A_ &sock->tm_w); + if (sz == 0) + abort(); + + if (errno != EAGAIN) { + perror("recv"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } } static void ss_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) { @@ -140,158 +149,169 @@ static void ss_cb(EV_P_ ev_io *w, int revents __attribute__((unused))) { DBG("-- entering ss_cb --"); - if ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&s_data->pkt_addr, &c_addrlen)) < 0) { - perror("recvfrom"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } - - if (c_addrlen != s_data->s_addrlen) - abort(); + while ((sz = recvfrom(w->fd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&s_data->pkt_addr, &c_addrlen)) > 0) { + if (c_addrlen != s_data->s_addrlen) + abort(); #ifdef DEBUG - char hbuf[NI_MAXHOST]; - r = getnameinfo((struct sockaddr *)&s_data->pkt_addr, c_addrlen, hbuf, sizeof(hbuf), NULL, 0, NI_NUMERICHOST); - if (r) { - fprintf(stderr, "getnameinfo: %s\n", gai_strerror(r)); - ev_break(EV_A_ EVBREAK_ONE); - return; - } - DBG("received %zd bytes from %s", sz, hbuf); + char hbuf[NI_MAXHOST]; + r = getnameinfo((struct sockaddr *)&s_data->pkt_addr, c_addrlen, hbuf, sizeof(hbuf), NULL, 0, NI_NUMERICHOST); + if (r) { + fprintf(stderr, "getnameinfo: %s\n", gai_strerror(r)); + ev_break(EV_A_ EVBREAK_ONE); + return; + } + DBG("received %zd bytes from %s", sz, hbuf); #endif - if ((size_t)sz < sizeof(struct tcphdr)) { - DBG("packet is smaller than TCP header, ignoring"); - return; - } + if ((size_t)sz < sizeof(struct tcphdr)) { + DBG("packet is smaller than TCP header, ignoring"); + return; + } - struct tcphdr *tcphdr = (struct tcphdr *)rbuf; + struct tcphdr *tcphdr = (struct tcphdr *)rbuf; - DBG("packet received on port %hu", ntohs(tcphdr->th_dport)); + DBG("packet received on port %hu", ntohs(tcphdr->th_dport)); - if (tcphdr->th_dport != ((struct sockaddr_in *)s_data->s_addr)->sin_port) { - DBG("packet should be on port %hu, ignoring", ntohs(((struct sockaddr_in *)s_data->s_addr)->sin_port)); - return; - } + if (tcphdr->th_dport != ((struct sockaddr_in *)s_data->s_addr)->sin_port) { + DBG("packet should be on port %hu, ignoring", ntohs(((struct sockaddr_in *)s_data->s_addr)->sin_port)); + return; + } - struct o_s_sock *sock; + struct o_s_sock *sock; - const uint8_t th_flags = tcphdr->th_flags; + const uint8_t th_flags = tcphdr->th_flags; - ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = tcphdr->th_sport; + ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = tcphdr->th_sport; - HASH_FIND(hh, s_data->o_socks_by_caddr, &s_data->pkt_addr, c_addrlen, sock); + HASH_FIND(hh, s_data->o_socks_by_caddr, &s_data->pkt_addr, c_addrlen, sock); - if (!sock) { - DBG("could not locate matching socket for client addr"); + if (!sock) { + DBG("could not locate matching socket for client addr"); - if (th_flags == TH_SYN) { - sock = malloc(sizeof(*sock)); + if (th_flags == TH_SYN) { + sock = malloc(sizeof(*sock)); - DBG("packet was SYN, initializing new connection @ %p", sock); + DBG("packet was SYN, initializing new connection @ %p", sock); - memcpy(&sock->c_addr, &s_data->pkt_addr, c_addrlen); + memcpy(&sock->c_addr, &s_data->pkt_addr, c_addrlen); - sock->s_data = s_data; - sock->seq_num = random(); - sock->c_sock = -1; - sock->status = TCP_SYN_RECV; + sock->s_data = s_data; + sock->seq_num = random(); + sock->c_sock = -1; + sock->status = TCP_SYN_RECV; - struct tcphdr buf = { - .th_sport = tcphdr->th_dport, - .th_dport = tcphdr->th_sport, - .th_seq = htonl(sock->seq_num), - .th_ack = tcphdr->th_seq, - .th_flags = TH_SYN | TH_ACK, - .th_off = 5 - }; + struct tcphdr buf = { + .th_sport = tcphdr->th_dport, + .th_dport = tcphdr->th_sport, + .th_seq = htonl(sock->seq_num), + .th_ack = tcphdr->th_seq, + .th_flags = TH_SYN | TH_ACK, + .th_off = 5 + }; - HASH_ADD(hh, s_data->o_socks_by_caddr, c_addr, c_addrlen, sock); + HASH_ADD(hh, s_data->o_socks_by_caddr, c_addr, c_addrlen, sock); - ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = htons(0); + ((struct sockaddr_in *)&s_data->pkt_addr)->sin_port = htons(0); - DBG("sending SYN/ACK"); - if ((sz = sendto(w->fd, &buf, sizeof(buf), 0, (struct sockaddr *)&s_data->pkt_addr, s_data->s_addrlen)) == -1) { - perror("sendto"); - ev_break(EV_A_ EVBREAK_ONE); - return; - } else if (sz != sizeof(buf)) { - fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz); + DBG("sending SYN/ACK"); + if ((sz = sendto(w->fd, &buf, sizeof(buf), 0, (struct sockaddr *)&s_data->pkt_addr, s_data->s_addrlen)) == -1) { + perror("sendto"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } else if (sz != sizeof(buf)) { + fprintf(stderr, "sendto %s our packet: tried %lu, sent %zd\n", (size_t)sz > sizeof(buf) ? "expanded" : "truncated", sizeof(buf), sz); + } + + ev_init(&sock->tm_w, s_tm_cb); + sock->tm_w.repeat = 10. * 60.; + sock->tm_w.data = sock; + ev_timer_again(EV_A_ &sock->tm_w); + } else { + DBG("packet was not SYN, ignoring"); } - ev_init(&sock->tm_w, s_tm_cb); - sock->tm_w.repeat = 10. * 60.; - sock->tm_w.data = sock; - ev_timer_again(EV_A_ &sock->tm_w); - } else { - DBG("packet was not SYN, ignoring"); + return; } - return; - } + if (tcphdr->th_off != 5) { + DBG("TCP options were specified, dropping packet"); + return; + } - if (tcphdr->th_off != 5) { - DBG("TCP options were specified, dropping packet"); - return; - } + if (th_flags == TH_RST) { + DBG("RST received, cleaning up socket"); + sock->status = TCP_CLOSE; + s_sock_cleanup(EV_A_ sock); + } - if (th_flags == TH_RST) { - DBG("RST received, cleaning up socket"); - sock->status = TCP_CLOSE; - s_sock_cleanup(EV_A_ sock); - } + if (th_flags & ~(TH_PUSH | TH_ACK)) { + DBG("TCP flags not PSH and/or ACK, dropping packet"); + return; + } - if (th_flags & ~(TH_PUSH | TH_ACK)) { - DBG("TCP flags not PSH and/or ACK, dropping packet"); - return; - } + if (sock->status == TCP_SYN_RECV) { + assert(sock->c_sock == -1); - if (sock->status == TCP_SYN_RECV) { - assert(sock->c_sock == -1); + DBG("no UDP socket for this connection, shifting to ESTABLISHED"); - DBG("no UDP socket for this connection, shifting to ESTABLISHED"); + sock->status = TCP_ESTABLISHED; - sock->status = TCP_ESTABLISHED; + struct addrinfo *res; + r = getaddrinfo(s_data->r_host, s_data->r_port, NULL, &res); + if (r) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r)); + ev_break(EV_A_ EVBREAK_ONE); + return; + } - struct addrinfo *res; - r = getaddrinfo(s_data->r_host, s_data->r_port, NULL, &res); - if (r) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(r)); - ev_break(EV_A_ EVBREAK_ONE); - return; - } + if ((sock->c_sock = socket(s_data->s_addr->sa_family, SOCK_DGRAM, 0)) == -1) { + perror("socket"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } - if ((sock->c_sock = socket(s_data->s_addr->sa_family, SOCK_DGRAM, 0)) == -1) { - perror("socket"); - ev_break(EV_A_ EVBREAK_ONE); - return; + if (connect(sock->c_sock, res->ai_addr, res->ai_addrlen)) { + perror("connect"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } + + if (fcntl(sock->c_sock, F_SETFL, O_NONBLOCK) == -1) { + perror("fcntl"); + ev_break(EV_A_ EVBREAK_ONE); + return; + } + + freeaddrinfo(res); + + ev_timer_stop(EV_A_ &sock->tm_w); + sock->tm_w.repeat = 60. * 60. * 3.; + ev_timer_start(EV_A_ &sock->tm_w); + + ev_io_init(&sock->io_w, sc_cb, sock->c_sock, EV_READ); + sock->io_w.data = sock; + ev_io_start(EV_A_ &sock->io_w); } - if (connect(sock->c_sock, res->ai_addr, res->ai_addrlen)) { - perror("connect"); + assert(sock->status == TCP_ESTABLISHED); + + DBG("sending %zu bytes to client", (size_t)(sz - tcphdr->th_off * 4)); + sz = send(sock->c_sock, rbuf + tcphdr->th_off * 4, sz - tcphdr->th_off * 4, 0); + if (sz < 0) { + perror("send"); ev_break(EV_A_ EVBREAK_ONE); return; } - - freeaddrinfo(res); - - ev_timer_stop(EV_A_ &sock->tm_w); - sock->tm_w.repeat = 60. * 60. * 3.; - ev_timer_start(EV_A_ &sock->tm_w); - - ev_io_init(&sock->io_w, sc_cb, sock->c_sock, EV_READ); - sock->io_w.data = sock; - ev_io_start(EV_A_ &sock->io_w); + return; } - assert(sock->status == TCP_ESTABLISHED); + if (sz == 0) + abort(); - DBG("sending %zu bytes to client", (size_t)(sz - tcphdr->th_off * 4)); - sz = send(sock->c_sock, rbuf + tcphdr->th_off * 4, sz - tcphdr->th_off * 4, 0); - if (sz < 0) { - perror("send"); + if (errno != EINVAL) { + perror("recvfrom"); ev_break(EV_A_ EVBREAK_ONE); - return; } } @@ -317,6 +337,12 @@ int start_server(const char *s_host, const char *s_port, const char *r_host, con return 1; } + if (fcntl(s_data.s_sock, F_SETFL, O_NONBLOCK) == -1) { + perror("fcntl"); + freeaddrinfo(res); + return 1; + } + struct ev_loop *loop = EV_DEFAULT; ev_io s_watcher; -- cgit v1.2.3-54-g00ecf