From 262965f53defd312a294b45366ea17907b6a616b Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Thu, 11 Aug 2005 16:25:56 -0400 Subject: [PATCH] [PATCH] RPC: separate TCP and UDP socket write paths Split the RPC client's main socket write path into a TCP version and a UDP version to eliminate another dependency on the "xprt->stream" variable. Compiler optimization removes unneeded code from xs_sendpages, as this function is now called with some constant arguments. We can now cleanly perform transport protocol-specific return code testing and error recovery in each path. Test-plan: Millions of fsx operations. Performance characterization such as "sio" or "iozone". Examine oprofile results for any changes before and after this patch is applied. Version: Thu, 11 Aug 2005 16:08:46 -0400 Signed-off-by: Chuck Lever Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 215 +++++++++++++++++++++++++----------------- 1 file changed, 128 insertions(+), 87 deletions(-) diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index f91529787b9b..57988300640a 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -40,6 +40,12 @@ */ #define XS_MAX_RESVPORT (800U) +/* + * How many times to try sending a request on a socket before waiting + * for the socket buffer to clear. + */ +#define XS_SENDMSG_RETRY (10U) + #ifdef RPC_DEBUG # undef RPC_DEBUG_DATA # define RPCDBG_FACILITY RPCDBG_TRANS @@ -114,13 +120,18 @@ static int xs_send_tail(struct socket *sock, struct xdr_buf *xdr, unsigned int b * @base: starting position in the buffer * */ -static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) +static inline int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) { struct page **ppage = xdr->pages; unsigned int len, pglen = xdr->page_len; int err, ret = 0; ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int); + if (unlikely(!sock)) + return -ENOTCONN; + + clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); + len = xdr->head[0].iov_len; if (base < len || (addr != NULL && base == 0)) { err = xs_send_head(sock, addr, addrlen, xdr, base, len); @@ -187,140 +198,162 @@ out: } /** - * xs_sendmsg - write an RPC request to a socket - * @xprt: generic transport - * @req: the RPC request to write + * xs_nospace - place task on wait queue if transmit was incomplete + * @task: task to put to sleep * */ -static int xs_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) +static void xs_nospace(struct rpc_task *task) { - struct socket *sock = xprt->sock; - struct xdr_buf *xdr = &req->rq_snd_buf; - struct sockaddr *addr = NULL; - int addrlen = 0; - unsigned int skip; - int result; + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; - if (!sock) - return -ENOTCONN; + dprintk("RPC: %4d xmit incomplete (%u left of %u)\n", + task->tk_pid, req->rq_slen - req->rq_bytes_sent, + req->rq_slen); + + if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { + /* Protect against races with write_space */ + spin_lock_bh(&xprt->transport_lock); + + /* Don't race with disconnect */ + if (!xprt_connected(xprt)) + task->tk_status = -ENOTCONN; + else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) + xprt_wait_for_buffer_space(task); + + spin_unlock_bh(&xprt->transport_lock); + } else + /* Keep holding the socket if it is blocked */ + rpc_delay(task, HZ>>4); +} + +/** + * xs_udp_send_request - write an RPC request to a UDP socket + * @task: address of RPC task that manages the state of an RPC request + * + * Return values: + * 0: The request has been sent + * EAGAIN: The socket was blocked, please call again later to + * complete the request + * ENOTCONN: Caller needs to invoke connect logic then call again + * other: Some other error occured, the request was not sent + */ +static int xs_udp_send_request(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + struct xdr_buf *xdr = &req->rq_snd_buf; + int status; xs_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); - /* For UDP, we need to provide an address */ - if (!xprt->stream) { - addr = (struct sockaddr *) &xprt->addr; - addrlen = sizeof(xprt->addr); - } - /* Don't repeat bytes */ - skip = req->rq_bytes_sent; + req->rq_xtime = jiffies; + status = xs_sendpages(xprt->sock, (struct sockaddr *) &xprt->addr, + sizeof(xprt->addr), xdr, req->rq_bytes_sent); - clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); - result = xs_sendpages(sock, addr, addrlen, xdr, skip); + dprintk("RPC: xs_udp_send_request(%u) = %d\n", + xdr->len - req->rq_bytes_sent, status); - dprintk("RPC: xs_sendmsg(%d) = %d\n", xdr->len - skip, result); + if (likely(status >= (int) req->rq_slen)) + return 0; - if (result >= 0) - return result; + /* Still some bytes left; set up for a retry later. */ + if (status > 0) + status = -EAGAIN; - switch (result) { + switch (status) { + case -ENETUNREACH: + case -EPIPE: case -ECONNREFUSED: /* When the server has died, an ICMP port unreachable message * prompts ECONNREFUSED. */ - case -EAGAIN: break; - case -ECONNRESET: - case -ENOTCONN: - case -EPIPE: - /* connection broken */ - if (xprt->stream) - result = -ENOTCONN; + case -EAGAIN: + xs_nospace(task); break; default: + dprintk("RPC: sendmsg returned unrecognized error %d\n", + -status); break; } - return result; + + return status; } /** - * xs_send_request - write an RPC request to a socket + * xs_tcp_send_request - write an RPC request to a TCP socket * @task: address of RPC task that manages the state of an RPC request * * Return values: - * 0: The request has been sent - * EAGAIN: The socket was blocked, please call again later to - * complete the request - * other: Some other error occured, the request was not sent + * 0: The request has been sent + * EAGAIN: The socket was blocked, please call again later to + * complete the request + * ENOTCONN: Caller needs to invoke connect logic then call again + * other: Some other error occured, the request was not sent * * XXX: In the case of soft timeouts, should we eventually give up - * if the socket is not able to make progress? + * if sendmsg is not able to make progress? */ -static int xs_send_request(struct rpc_task *task) +static int xs_tcp_send_request(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; + struct xdr_buf *xdr = &req->rq_snd_buf; + u32 *marker = req->rq_svec[0].iov_base; int status, retry = 0; - /* set up everything as needed. */ /* Write the record marker */ - if (xprt->stream) { - u32 *marker = req->rq_svec[0].iov_base; + *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); - *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); - } + xs_pktdump("packet data:", + req->rq_svec->iov_base, + req->rq_svec->iov_len); /* Continue transmitting the packet/record. We must be careful * to cope with writespace callbacks arriving _after_ we have - * called sendmsg(). - */ + * called sendmsg(). */ while (1) { req->rq_xtime = jiffies; - status = xs_sendmsg(xprt, req); + status = xs_sendpages(xprt->sock, NULL, 0, xdr, + req->rq_bytes_sent); - if (status < 0) - break; + dprintk("RPC: xs_tcp_send_request(%u) = %d\n", + xdr->len - req->rq_bytes_sent, status); - if (xprt->stream) { - req->rq_bytes_sent += status; - - /* If we've sent the entire packet, immediately - * reset the count of bytes sent. */ - if (req->rq_bytes_sent >= req->rq_slen) { - req->rq_bytes_sent = 0; - return 0; - } - } else { - if (status >= req->rq_slen) - return 0; - status = -EAGAIN; + if (unlikely(status < 0)) break; - } - dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", - task->tk_pid, req->rq_slen - req->rq_bytes_sent, - req->rq_slen); + /* If we've sent the entire packet, immediately + * reset the count of bytes sent. */ + req->rq_bytes_sent += status; + if (likely(req->rq_bytes_sent >= req->rq_slen)) { + req->rq_bytes_sent = 0; + return 0; + } status = -EAGAIN; - if (retry++ > 50) + if (retry++ > XS_SENDMSG_RETRY) break; } - if (status == -EAGAIN) { - if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { - /* Protect against races with write_space */ - spin_lock_bh(&xprt->transport_lock); - /* Don't race with disconnect */ - if (!xprt_connected(xprt)) - task->tk_status = -ENOTCONN; - else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) - xprt_wait_for_buffer_space(task); - spin_unlock_bh(&xprt->transport_lock); - return status; - } - /* Keep holding the socket if it is blocked */ - rpc_delay(task, HZ>>4); + switch (status) { + case -EAGAIN: + xs_nospace(task); + break; + case -ECONNREFUSED: + case -ECONNRESET: + case -ENOTCONN: + case -EPIPE: + status = -ENOTCONN; + break; + default: + dprintk("RPC: sendmsg returned unrecognized error %d\n", + -status); + break; } + return status; } @@ -992,10 +1025,18 @@ static void xs_connect(struct rpc_task *task) } } -static struct rpc_xprt_ops xs_ops = { +static struct rpc_xprt_ops xs_udp_ops = { + .set_buffer_size = xs_set_buffer_size, + .connect = xs_connect, + .send_request = xs_udp_send_request, + .close = xs_close, + .destroy = xs_destroy, +}; + +static struct rpc_xprt_ops xs_tcp_ops = { .set_buffer_size = xs_set_buffer_size, .connect = xs_connect, - .send_request = xs_send_request, + .send_request = xs_tcp_send_request, .close = xs_close, .destroy = xs_destroy, }; @@ -1033,7 +1074,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt); - xprt->ops = &xs_ops; + xprt->ops = &xs_udp_ops; if (to) xprt->timeout = *to; @@ -1072,7 +1113,7 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to) INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt); - xprt->ops = &xs_ops; + xprt->ops = &xs_tcp_ops; if (to) xprt->timeout = *to; -- 2.34.1