2 * Copyright (c) 2015 Oracle. All rights reserved.
4 * Support for backward direction RPCs on RPC/RDMA.
7 #include <linux/module.h>
8 #include <linux/sunrpc/xprt.h>
9 #include <linux/sunrpc/svc.h>
11 #include "xprt_rdma.h"
13 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
14 # define RPCDBG_FACILITY RPCDBG_TRANS
17 #define RPCRDMA_BACKCHANNEL_DEBUG
19 static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
20 struct rpc_rqst *rqst)
22 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
23 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
25 spin_lock(&buf->rb_reqslock);
26 list_del(&req->rl_all);
27 spin_unlock(&buf->rb_reqslock);
29 rpcrdma_destroy_req(&r_xprt->rx_ia, req);
34 static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
35 struct rpc_rqst *rqst)
37 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
38 struct rpcrdma_regbuf *rb;
39 struct rpcrdma_req *req;
43 req = rpcrdma_create_req(r_xprt);
46 req->rl_backchannel = true;
48 size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
49 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
54 size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
55 rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
60 /* so that rpcr_to_rdmar works when receiving a request */
61 rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
63 buf = &rqst->rq_snd_buf;
64 buf->head[0].iov_base = rqst->rq_buffer;
65 buf->head[0].iov_len = 0;
66 buf->tail[0].iov_base = NULL;
67 buf->tail[0].iov_len = 0;
75 rpcrdma_bc_free_rqst(r_xprt, rqst);
79 /* Allocate and add receive buffers to the rpcrdma_buffer's
80 * existing list of rep's. These are released when the
81 * transport is destroyed.
83 static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
86 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
87 struct rpcrdma_rep *rep;
92 rep = rpcrdma_create_rep(r_xprt);
94 pr_err("RPC: %s: reply buffer alloc failed\n",
100 spin_lock_irqsave(&buffers->rb_lock, flags);
101 list_add(&rep->rr_list, &buffers->rb_recv_bufs);
102 spin_unlock_irqrestore(&buffers->rb_lock, flags);
109 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
110 * @xprt: transport associated with these backchannel resources
111 * @reqs: number of concurrent incoming requests to expect
113 * Returns 0 on success; otherwise a negative errno
115 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
117 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
118 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
119 struct rpc_rqst *rqst;
123 /* The backchannel reply path returns each rpc_rqst to the
124 * bc_pa_list _after_ the reply is sent. If the server is
125 * faster than the client, it can send another backward
126 * direction request before the rpc_rqst is returned to the
127 * list. The client rejects the request in this case.
129 * Twice as many rpc_rqsts are prepared to ensure there is
130 * always an rpc_rqst available as soon as a reply is sent.
132 if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
135 for (i = 0; i < (reqs << 1); i++) {
136 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
138 pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
143 rqst->rq_xprt = &r_xprt->rx_xprt;
144 INIT_LIST_HEAD(&rqst->rq_list);
145 INIT_LIST_HEAD(&rqst->rq_bc_list);
147 if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
150 spin_lock_bh(&xprt->bc_pa_lock);
151 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
152 spin_unlock_bh(&xprt->bc_pa_lock);
155 rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
159 rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
163 buffer->rb_bc_srv_max_requests = reqs;
164 request_module("svcrdma");
169 xprt_rdma_bc_destroy(xprt, reqs);
172 pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
177 * rpcrdma_bc_marshal_reply - Send backwards direction reply
178 * @rqst: buffer containing RPC reply data
180 * Returns zero on success.
182 int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
184 struct rpc_xprt *xprt = rqst->rq_xprt;
185 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
186 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
187 struct rpcrdma_msg *headerp;
190 headerp = rdmab_to_msg(req->rl_rdmabuf);
191 headerp->rm_xid = rqst->rq_xid;
192 headerp->rm_vers = rpcrdma_version;
194 cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
195 headerp->rm_type = rdma_msg;
196 headerp->rm_body.rm_chunks[0] = xdr_zero;
197 headerp->rm_body.rm_chunks[1] = xdr_zero;
198 headerp->rm_body.rm_chunks[2] = xdr_zero;
200 rpclen = rqst->rq_svec[0].iov_len;
202 pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
203 __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
204 pr_info("RPC: %s: RPC/RDMA: %*ph\n",
205 __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
206 pr_info("RPC: %s: RPC: %*ph\n",
207 __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
209 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
210 req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
211 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
213 req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
214 req->rl_send_iov[1].length = rpclen;
215 req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
222 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
223 * @xprt: transport associated with these backchannel resources
224 * @reqs: number of incoming requests to destroy; ignored
226 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
228 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
229 struct rpc_rqst *rqst, *tmp;
231 spin_lock_bh(&xprt->bc_pa_lock);
232 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
233 list_del(&rqst->rq_bc_pa_list);
234 spin_unlock_bh(&xprt->bc_pa_lock);
236 rpcrdma_bc_free_rqst(r_xprt, rqst);
238 spin_lock_bh(&xprt->bc_pa_lock);
240 spin_unlock_bh(&xprt->bc_pa_lock);
244 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
245 * @rqst: request to release
247 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
249 struct rpc_xprt *xprt = rqst->rq_xprt;
251 smp_mb__before_atomic();
252 WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
253 clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
254 smp_mb__after_atomic();
256 spin_lock_bh(&xprt->bc_pa_lock);
257 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
258 spin_unlock_bh(&xprt->bc_pa_lock);
262 * rpcrdma_bc_receive_call - Handle a backward direction call
263 * @xprt: transport receiving the call
264 * @rep: receive buffer containing the call
266 * Called in the RPC reply handler, which runs in a tasklet.
269 * Operational assumptions:
270 * o Backchannel credits are ignored, just as the NFS server
271 * forechannel currently does
272 * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
273 * No replay detection is done at the transport level
275 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
276 struct rpcrdma_rep *rep)
278 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
279 struct rpcrdma_msg *headerp;
280 struct svc_serv *bc_serv;
281 struct rpcrdma_req *req;
282 struct rpc_rqst *rqst;
287 headerp = rdmab_to_msg(rep->rr_rdmabuf);
288 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
289 pr_info("RPC: %s: callback XID %08x, length=%u\n",
290 __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
291 pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp);
295 * Need at least enough bytes for RPC/RDMA header, as code
296 * here references the header fields by array offset. Also,
297 * backward calls are always inline, so ensure there
298 * are some bytes beyond the RPC/RDMA header.
300 if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
302 p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
303 size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
305 /* Grab a free bc rqst */
306 spin_lock(&xprt->bc_pa_lock);
307 if (list_empty(&xprt->bc_pa_list)) {
308 spin_unlock(&xprt->bc_pa_lock);
311 rqst = list_first_entry(&xprt->bc_pa_list,
312 struct rpc_rqst, rq_bc_pa_list);
313 list_del(&rqst->rq_bc_pa_list);
314 spin_unlock(&xprt->bc_pa_lock);
315 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
316 pr_info("RPC: %s: using rqst %p\n", __func__, rqst);
320 rqst->rq_reply_bytes_recvd = 0;
321 rqst->rq_bytes_sent = 0;
322 rqst->rq_xid = headerp->rm_xid;
323 set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
325 buf = &rqst->rq_rcv_buf;
326 memset(buf, 0, sizeof(*buf));
327 buf->head[0].iov_base = p;
328 buf->head[0].iov_len = size;
331 /* The receive buffer has to be hooked to the rpcrdma_req
332 * so that it can be reposted after the server is done
333 * parsing it but just before sending the backward
336 req = rpcr_to_rdmar(rqst);
337 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
338 pr_info("RPC: %s: attaching rep %p to req %p\n",
343 /* Defeat the retransmit detection logic in send_request */
344 req->rl_connect_cookie = 0;
346 /* Queue rqst for ULP's callback service */
347 bc_serv = xprt->bc_serv;
348 spin_lock(&bc_serv->sv_cb_lock);
349 list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
350 spin_unlock(&bc_serv->sv_cb_lock);
352 wake_up(&bc_serv->sv_cb_waitq);
354 r_xprt->rx_stats.bcall_count++;
358 pr_warn("RPC/RDMA backchannel overflow\n");
359 xprt_disconnect_done(xprt);
360 /* This receive buffer gets reposted automatically
361 * when the connection is re-established.
366 pr_warn("RPC/RDMA short backward direction call\n");
368 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
369 xprt_disconnect_done(xprt);
371 pr_warn("RPC: %s: reposting rep %p\n",