0b3387fe3f0dc140d86eeaf972d0a9a58f91eb86
[firefly-linux-kernel-4.4.55.git] / net / sunrpc / xprtrdma / backchannel.c
1 /*
2  * Copyright (c) 2015 Oracle.  All rights reserved.
3  *
4  * Support for backward direction RPCs on RPC/RDMA.
5  */
6
7 #include <linux/module.h>
8 #include <linux/sunrpc/xprt.h>
9 #include <linux/sunrpc/svc.h>
10
11 #include "xprt_rdma.h"
12
13 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
14 # define RPCDBG_FACILITY        RPCDBG_TRANS
15 #endif
16
17 #define RPCRDMA_BACKCHANNEL_DEBUG
18
19 static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
20                                  struct rpc_rqst *rqst)
21 {
22         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
23         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
24
25         spin_lock(&buf->rb_reqslock);
26         list_del(&req->rl_all);
27         spin_unlock(&buf->rb_reqslock);
28
29         rpcrdma_destroy_req(&r_xprt->rx_ia, req);
30
31         kfree(rqst);
32 }
33
34 static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
35                                  struct rpc_rqst *rqst)
36 {
37         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
38         struct rpcrdma_regbuf *rb;
39         struct rpcrdma_req *req;
40         struct xdr_buf *buf;
41         size_t size;
42
43         req = rpcrdma_create_req(r_xprt);
44         if (!req)
45                 return -ENOMEM;
46         req->rl_backchannel = true;
47
48         size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
49         rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
50         if (IS_ERR(rb))
51                 goto out_fail;
52         req->rl_rdmabuf = rb;
53
54         size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
55         rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
56         if (IS_ERR(rb))
57                 goto out_fail;
58         rb->rg_owner = req;
59         req->rl_sendbuf = rb;
60         /* so that rpcr_to_rdmar works when receiving a request */
61         rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
62
63         buf = &rqst->rq_snd_buf;
64         buf->head[0].iov_base = rqst->rq_buffer;
65         buf->head[0].iov_len = 0;
66         buf->tail[0].iov_base = NULL;
67         buf->tail[0].iov_len = 0;
68         buf->page_len = 0;
69         buf->len = 0;
70         buf->buflen = size;
71
72         return 0;
73
74 out_fail:
75         rpcrdma_bc_free_rqst(r_xprt, rqst);
76         return -ENOMEM;
77 }
78
79 /* Allocate and add receive buffers to the rpcrdma_buffer's
80  * existing list of rep's. These are released when the
81  * transport is destroyed.
82  */
83 static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
84                                  unsigned int count)
85 {
86         struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
87         struct rpcrdma_rep *rep;
88         unsigned long flags;
89         int rc = 0;
90
91         while (count--) {
92                 rep = rpcrdma_create_rep(r_xprt);
93                 if (IS_ERR(rep)) {
94                         pr_err("RPC:       %s: reply buffer alloc failed\n",
95                                __func__);
96                         rc = PTR_ERR(rep);
97                         break;
98                 }
99
100                 spin_lock_irqsave(&buffers->rb_lock, flags);
101                 list_add(&rep->rr_list, &buffers->rb_recv_bufs);
102                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
103         }
104
105         return rc;
106 }
107
108 /**
109  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
110  * @xprt: transport associated with these backchannel resources
111  * @reqs: number of concurrent incoming requests to expect
112  *
113  * Returns 0 on success; otherwise a negative errno
114  */
115 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
116 {
117         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
118         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
119         struct rpc_rqst *rqst;
120         unsigned int i;
121         int rc;
122
123         /* The backchannel reply path returns each rpc_rqst to the
124          * bc_pa_list _after_ the reply is sent. If the server is
125          * faster than the client, it can send another backward
126          * direction request before the rpc_rqst is returned to the
127          * list. The client rejects the request in this case.
128          *
129          * Twice as many rpc_rqsts are prepared to ensure there is
130          * always an rpc_rqst available as soon as a reply is sent.
131          */
132         if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
133                 goto out_err;
134
135         for (i = 0; i < (reqs << 1); i++) {
136                 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
137                 if (!rqst) {
138                         pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
139                                __func__);
140                         goto out_free;
141                 }
142
143                 rqst->rq_xprt = &r_xprt->rx_xprt;
144                 INIT_LIST_HEAD(&rqst->rq_list);
145                 INIT_LIST_HEAD(&rqst->rq_bc_list);
146
147                 if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
148                         goto out_free;
149
150                 spin_lock_bh(&xprt->bc_pa_lock);
151                 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
152                 spin_unlock_bh(&xprt->bc_pa_lock);
153         }
154
155         rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
156         if (rc)
157                 goto out_free;
158
159         rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
160         if (rc)
161                 goto out_free;
162
163         buffer->rb_bc_srv_max_requests = reqs;
164         request_module("svcrdma");
165
166         return 0;
167
168 out_free:
169         xprt_rdma_bc_destroy(xprt, reqs);
170
171 out_err:
172         pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
173         return -ENOMEM;
174 }
175
176 /**
177  * rpcrdma_bc_marshal_reply - Send backwards direction reply
178  * @rqst: buffer containing RPC reply data
179  *
180  * Returns zero on success.
181  */
182 int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
183 {
184         struct rpc_xprt *xprt = rqst->rq_xprt;
185         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
186         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
187         struct rpcrdma_msg *headerp;
188         size_t rpclen;
189
190         headerp = rdmab_to_msg(req->rl_rdmabuf);
191         headerp->rm_xid = rqst->rq_xid;
192         headerp->rm_vers = rpcrdma_version;
193         headerp->rm_credit =
194                         cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
195         headerp->rm_type = rdma_msg;
196         headerp->rm_body.rm_chunks[0] = xdr_zero;
197         headerp->rm_body.rm_chunks[1] = xdr_zero;
198         headerp->rm_body.rm_chunks[2] = xdr_zero;
199
200         rpclen = rqst->rq_svec[0].iov_len;
201
202         pr_info("RPC:       %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
203                 __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
204         pr_info("RPC:       %s: RPC/RDMA: %*ph\n",
205                 __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
206         pr_info("RPC:       %s:      RPC: %*ph\n",
207                 __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
208
209         req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
210         req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
211         req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
212
213         req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
214         req->rl_send_iov[1].length = rpclen;
215         req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
216
217         req->rl_niovs = 2;
218         return 0;
219 }
220
221 /**
222  * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
223  * @xprt: transport associated with these backchannel resources
224  * @reqs: number of incoming requests to destroy; ignored
225  */
226 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
227 {
228         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
229         struct rpc_rqst *rqst, *tmp;
230
231         spin_lock_bh(&xprt->bc_pa_lock);
232         list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
233                 list_del(&rqst->rq_bc_pa_list);
234                 spin_unlock_bh(&xprt->bc_pa_lock);
235
236                 rpcrdma_bc_free_rqst(r_xprt, rqst);
237
238                 spin_lock_bh(&xprt->bc_pa_lock);
239         }
240         spin_unlock_bh(&xprt->bc_pa_lock);
241 }
242
243 /**
244  * xprt_rdma_bc_free_rqst - Release a backchannel rqst
245  * @rqst: request to release
246  */
247 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
248 {
249         struct rpc_xprt *xprt = rqst->rq_xprt;
250
251         smp_mb__before_atomic();
252         WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
253         clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
254         smp_mb__after_atomic();
255
256         spin_lock_bh(&xprt->bc_pa_lock);
257         list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
258         spin_unlock_bh(&xprt->bc_pa_lock);
259 }
260
261 /**
262  * rpcrdma_bc_receive_call - Handle a backward direction call
263  * @xprt: transport receiving the call
264  * @rep: receive buffer containing the call
265  *
266  * Called in the RPC reply handler, which runs in a tasklet.
267  * Be quick about it.
268  *
269  * Operational assumptions:
270  *    o Backchannel credits are ignored, just as the NFS server
271  *      forechannel currently does
272  *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
273  *      No replay detection is done at the transport level
274  */
275 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
276                              struct rpcrdma_rep *rep)
277 {
278         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
279         struct rpcrdma_msg *headerp;
280         struct svc_serv *bc_serv;
281         struct rpcrdma_req *req;
282         struct rpc_rqst *rqst;
283         struct xdr_buf *buf;
284         size_t size;
285         __be32 *p;
286
287         headerp = rdmab_to_msg(rep->rr_rdmabuf);
288 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
289         pr_info("RPC:       %s: callback XID %08x, length=%u\n",
290                 __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
291         pr_info("RPC:       %s: %*ph\n", __func__, rep->rr_len, headerp);
292 #endif
293
294         /* Sanity check:
295          * Need at least enough bytes for RPC/RDMA header, as code
296          * here references the header fields by array offset. Also,
297          * backward calls are always inline, so ensure there
298          * are some bytes beyond the RPC/RDMA header.
299          */
300         if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
301                 goto out_short;
302         p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
303         size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
304
305         /* Grab a free bc rqst */
306         spin_lock(&xprt->bc_pa_lock);
307         if (list_empty(&xprt->bc_pa_list)) {
308                 spin_unlock(&xprt->bc_pa_lock);
309                 goto out_overflow;
310         }
311         rqst = list_first_entry(&xprt->bc_pa_list,
312                                 struct rpc_rqst, rq_bc_pa_list);
313         list_del(&rqst->rq_bc_pa_list);
314         spin_unlock(&xprt->bc_pa_lock);
315 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
316         pr_info("RPC:       %s: using rqst %p\n", __func__, rqst);
317 #endif
318
319         /* Prepare rqst */
320         rqst->rq_reply_bytes_recvd = 0;
321         rqst->rq_bytes_sent = 0;
322         rqst->rq_xid = headerp->rm_xid;
323         set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
324
325         buf = &rqst->rq_rcv_buf;
326         memset(buf, 0, sizeof(*buf));
327         buf->head[0].iov_base = p;
328         buf->head[0].iov_len = size;
329         buf->len = size;
330
331         /* The receive buffer has to be hooked to the rpcrdma_req
332          * so that it can be reposted after the server is done
333          * parsing it but just before sending the backward
334          * direction reply.
335          */
336         req = rpcr_to_rdmar(rqst);
337 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
338         pr_info("RPC:       %s: attaching rep %p to req %p\n",
339                 __func__, rep, req);
340 #endif
341         req->rl_reply = rep;
342
343         /* Defeat the retransmit detection logic in send_request */
344         req->rl_connect_cookie = 0;
345
346         /* Queue rqst for ULP's callback service */
347         bc_serv = xprt->bc_serv;
348         spin_lock(&bc_serv->sv_cb_lock);
349         list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
350         spin_unlock(&bc_serv->sv_cb_lock);
351
352         wake_up(&bc_serv->sv_cb_waitq);
353
354         r_xprt->rx_stats.bcall_count++;
355         return;
356
357 out_overflow:
358         pr_warn("RPC/RDMA backchannel overflow\n");
359         xprt_disconnect_done(xprt);
360         /* This receive buffer gets reposted automatically
361          * when the connection is re-established.
362          */
363         return;
364
365 out_short:
366         pr_warn("RPC/RDMA short backward direction call\n");
367
368         if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
369                 xprt_disconnect_done(xprt);
370         else
371                 pr_warn("RPC:       %s: reposting rep %p\n",
372                         __func__, rep);
373 }