234083560d0e2c5817d47775c17eceebdef23117
[firefly-linux-kernel-4.4.55.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
55
56 #include "xprt_rdma.h"
57
58 /*
59  * Globals/Macros
60  */
61
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY        RPCDBG_TRANS
64 #endif
65
66 /*
67  * internal functions
68  */
69
70 /*
71  * handle replies in tasklet context, using a single, global list
72  * rdma tasklet function -- just turn around and call the func
73  * for all replies on the list
74  */
75
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
78
79 static void
80 rpcrdma_run_tasklet(unsigned long data)
81 {
82         struct rpcrdma_rep *rep;
83         unsigned long flags;
84
85         data = data;
86         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
87         while (!list_empty(&rpcrdma_tasklets_g)) {
88                 rep = list_entry(rpcrdma_tasklets_g.next,
89                                  struct rpcrdma_rep, rr_list);
90                 list_del(&rep->rr_list);
91                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
92
93                 rpcrdma_reply_handler(rep);
94
95                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
96         }
97         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
98 }
99
100 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
101
102 static const char * const async_event[] = {
103         "CQ error",
104         "QP fatal error",
105         "QP request error",
106         "QP access error",
107         "communication established",
108         "send queue drained",
109         "path migration successful",
110         "path mig error",
111         "device fatal error",
112         "port active",
113         "port error",
114         "LID change",
115         "P_key change",
116         "SM change",
117         "SRQ error",
118         "SRQ limit reached",
119         "last WQE reached",
120         "client reregister",
121         "GID change",
122 };
123
124 #define ASYNC_MSG(status)                                       \
125         ((status) < ARRAY_SIZE(async_event) ?                   \
126                 async_event[(status)] : "unknown async error")
127
128 static void
129 rpcrdma_schedule_tasklet(struct list_head *sched_list)
130 {
131         unsigned long flags;
132
133         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
134         list_splice_tail(sched_list, &rpcrdma_tasklets_g);
135         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
136         tasklet_schedule(&rpcrdma_tasklet_g);
137 }
138
139 static void
140 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
141 {
142         struct rpcrdma_ep *ep = context;
143
144         pr_err("RPC:       %s: %s on device %s ep %p\n",
145                __func__, ASYNC_MSG(event->event),
146                 event->device->name, context);
147         if (ep->rep_connected == 1) {
148                 ep->rep_connected = -EIO;
149                 rpcrdma_conn_func(ep);
150                 wake_up_all(&ep->rep_connect_wait);
151         }
152 }
153
154 static void
155 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
156 {
157         struct rpcrdma_ep *ep = context;
158
159         pr_err("RPC:       %s: %s on device %s ep %p\n",
160                __func__, ASYNC_MSG(event->event),
161                 event->device->name, context);
162         if (ep->rep_connected == 1) {
163                 ep->rep_connected = -EIO;
164                 rpcrdma_conn_func(ep);
165                 wake_up_all(&ep->rep_connect_wait);
166         }
167 }
168
169 static const char * const wc_status[] = {
170         "success",
171         "local length error",
172         "local QP operation error",
173         "local EE context operation error",
174         "local protection error",
175         "WR flushed",
176         "memory management operation error",
177         "bad response error",
178         "local access error",
179         "remote invalid request error",
180         "remote access error",
181         "remote operation error",
182         "transport retry counter exceeded",
183         "RNR retry counter exceeded",
184         "local RDD violation error",
185         "remove invalid RD request",
186         "operation aborted",
187         "invalid EE context number",
188         "invalid EE context state",
189         "fatal error",
190         "response timeout error",
191         "general error",
192 };
193
194 #define COMPLETION_MSG(status)                                  \
195         ((status) < ARRAY_SIZE(wc_status) ?                     \
196                 wc_status[(status)] : "unexpected completion error")
197
198 static void
199 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
200 {
201         /* WARNING: Only wr_id and status are reliable at this point */
202         if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
203                 if (wc->status != IB_WC_SUCCESS &&
204                     wc->status != IB_WC_WR_FLUSH_ERR)
205                         pr_err("RPC:       %s: SEND: %s\n",
206                                __func__, COMPLETION_MSG(wc->status));
207         } else {
208                 struct rpcrdma_mw *r;
209
210                 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
211                 r->mw_sendcompletion(wc);
212         }
213 }
214
215 static int
216 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
217 {
218         struct ib_wc *wcs;
219         int budget, count, rc;
220
221         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
222         do {
223                 wcs = ep->rep_send_wcs;
224
225                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
226                 if (rc <= 0)
227                         return rc;
228
229                 count = rc;
230                 while (count-- > 0)
231                         rpcrdma_sendcq_process_wc(wcs++);
232         } while (rc == RPCRDMA_POLLSIZE && --budget);
233         return 0;
234 }
235
236 /*
237  * Handle send, fast_reg_mr, and local_inv completions.
238  *
239  * Send events are typically suppressed and thus do not result
240  * in an upcall. Occasionally one is signaled, however. This
241  * prevents the provider's completion queue from wrapping and
242  * losing a completion.
243  */
244 static void
245 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
246 {
247         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
248         int rc;
249
250         rc = rpcrdma_sendcq_poll(cq, ep);
251         if (rc) {
252                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
253                         __func__, rc);
254                 return;
255         }
256
257         rc = ib_req_notify_cq(cq,
258                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
259         if (rc == 0)
260                 return;
261         if (rc < 0) {
262                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
263                         __func__, rc);
264                 return;
265         }
266
267         rpcrdma_sendcq_poll(cq, ep);
268 }
269
270 static void
271 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
272 {
273         struct rpcrdma_rep *rep =
274                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
275
276         /* WARNING: Only wr_id and status are reliable at this point */
277         if (wc->status != IB_WC_SUCCESS)
278                 goto out_fail;
279
280         /* status == SUCCESS means all fields in wc are trustworthy */
281         if (wc->opcode != IB_WC_RECV)
282                 return;
283
284         dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
285                 __func__, rep, wc->byte_len);
286
287         rep->rr_len = wc->byte_len;
288         ib_dma_sync_single_for_cpu(rep->rr_device,
289                                    rdmab_addr(rep->rr_rdmabuf),
290                                    rep->rr_len, DMA_FROM_DEVICE);
291         prefetch(rdmab_to_msg(rep->rr_rdmabuf));
292
293 out_schedule:
294         list_add_tail(&rep->rr_list, sched_list);
295         return;
296 out_fail:
297         if (wc->status != IB_WC_WR_FLUSH_ERR)
298                 pr_err("RPC:       %s: rep %p: %s\n",
299                        __func__, rep, COMPLETION_MSG(wc->status));
300         rep->rr_len = ~0U;
301         goto out_schedule;
302 }
303
304 static int
305 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
306 {
307         struct list_head sched_list;
308         struct ib_wc *wcs;
309         int budget, count, rc;
310
311         INIT_LIST_HEAD(&sched_list);
312         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
313         do {
314                 wcs = ep->rep_recv_wcs;
315
316                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
317                 if (rc <= 0)
318                         goto out_schedule;
319
320                 count = rc;
321                 while (count-- > 0)
322                         rpcrdma_recvcq_process_wc(wcs++, &sched_list);
323         } while (rc == RPCRDMA_POLLSIZE && --budget);
324         rc = 0;
325
326 out_schedule:
327         rpcrdma_schedule_tasklet(&sched_list);
328         return rc;
329 }
330
331 /*
332  * Handle receive completions.
333  *
334  * It is reentrant but processes single events in order to maintain
335  * ordering of receives to keep server credits.
336  *
337  * It is the responsibility of the scheduled tasklet to return
338  * recv buffers to the pool. NOTE: this affects synchronization of
339  * connection shutdown. That is, the structures required for
340  * the completion of the reply handler must remain intact until
341  * all memory has been reclaimed.
342  */
343 static void
344 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
345 {
346         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
347         int rc;
348
349         rc = rpcrdma_recvcq_poll(cq, ep);
350         if (rc) {
351                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
352                         __func__, rc);
353                 return;
354         }
355
356         rc = ib_req_notify_cq(cq,
357                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
358         if (rc == 0)
359                 return;
360         if (rc < 0) {
361                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
362                         __func__, rc);
363                 return;
364         }
365
366         rpcrdma_recvcq_poll(cq, ep);
367 }
368
369 static void
370 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
371 {
372         struct ib_wc wc;
373         LIST_HEAD(sched_list);
374
375         while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
376                 rpcrdma_recvcq_process_wc(&wc, &sched_list);
377         if (!list_empty(&sched_list))
378                 rpcrdma_schedule_tasklet(&sched_list);
379         while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
380                 rpcrdma_sendcq_process_wc(&wc);
381 }
382
383 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
384 static const char * const conn[] = {
385         "address resolved",
386         "address error",
387         "route resolved",
388         "route error",
389         "connect request",
390         "connect response",
391         "connect error",
392         "unreachable",
393         "rejected",
394         "established",
395         "disconnected",
396         "device removal",
397         "multicast join",
398         "multicast error",
399         "address change",
400         "timewait exit",
401 };
402
403 #define CONNECTION_MSG(status)                                          \
404         ((status) < ARRAY_SIZE(conn) ?                                  \
405                 conn[(status)] : "unrecognized connection error")
406 #endif
407
408 static int
409 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
410 {
411         struct rpcrdma_xprt *xprt = id->context;
412         struct rpcrdma_ia *ia = &xprt->rx_ia;
413         struct rpcrdma_ep *ep = &xprt->rx_ep;
414 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
415         struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
416 #endif
417         struct ib_qp_attr *attr = &ia->ri_qp_attr;
418         struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
419         int connstate = 0;
420
421         switch (event->event) {
422         case RDMA_CM_EVENT_ADDR_RESOLVED:
423         case RDMA_CM_EVENT_ROUTE_RESOLVED:
424                 ia->ri_async_rc = 0;
425                 complete(&ia->ri_done);
426                 break;
427         case RDMA_CM_EVENT_ADDR_ERROR:
428                 ia->ri_async_rc = -EHOSTUNREACH;
429                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
430                         __func__, ep);
431                 complete(&ia->ri_done);
432                 break;
433         case RDMA_CM_EVENT_ROUTE_ERROR:
434                 ia->ri_async_rc = -ENETUNREACH;
435                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
436                         __func__, ep);
437                 complete(&ia->ri_done);
438                 break;
439         case RDMA_CM_EVENT_ESTABLISHED:
440                 connstate = 1;
441                 ib_query_qp(ia->ri_id->qp, attr,
442                             IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
443                             iattr);
444                 dprintk("RPC:       %s: %d responder resources"
445                         " (%d initiator)\n",
446                         __func__, attr->max_dest_rd_atomic,
447                         attr->max_rd_atomic);
448                 goto connected;
449         case RDMA_CM_EVENT_CONNECT_ERROR:
450                 connstate = -ENOTCONN;
451                 goto connected;
452         case RDMA_CM_EVENT_UNREACHABLE:
453                 connstate = -ENETDOWN;
454                 goto connected;
455         case RDMA_CM_EVENT_REJECTED:
456                 connstate = -ECONNREFUSED;
457                 goto connected;
458         case RDMA_CM_EVENT_DISCONNECTED:
459                 connstate = -ECONNABORTED;
460                 goto connected;
461         case RDMA_CM_EVENT_DEVICE_REMOVAL:
462                 connstate = -ENODEV;
463 connected:
464                 dprintk("RPC:       %s: %sconnected\n",
465                                         __func__, connstate > 0 ? "" : "dis");
466                 ep->rep_connected = connstate;
467                 rpcrdma_conn_func(ep);
468                 wake_up_all(&ep->rep_connect_wait);
469                 /*FALLTHROUGH*/
470         default:
471                 dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
472                         __func__, sap, rpc_get_port(sap), ep,
473                         CONNECTION_MSG(event->event));
474                 break;
475         }
476
477 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
478         if (connstate == 1) {
479                 int ird = attr->max_dest_rd_atomic;
480                 int tird = ep->rep_remote_cma.responder_resources;
481
482                 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
483                         sap, rpc_get_port(sap),
484                         ia->ri_device->name,
485                         ia->ri_ops->ro_displayname,
486                         xprt->rx_buf.rb_max_requests,
487                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
488         } else if (connstate < 0) {
489                 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
490                         sap, rpc_get_port(sap), connstate);
491         }
492 #endif
493
494         return 0;
495 }
496
497 static struct rdma_cm_id *
498 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
499                         struct rpcrdma_ia *ia, struct sockaddr *addr)
500 {
501         struct rdma_cm_id *id;
502         int rc;
503
504         init_completion(&ia->ri_done);
505
506         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
507         if (IS_ERR(id)) {
508                 rc = PTR_ERR(id);
509                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
510                         __func__, rc);
511                 return id;
512         }
513
514         ia->ri_async_rc = -ETIMEDOUT;
515         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
516         if (rc) {
517                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
518                         __func__, rc);
519                 goto out;
520         }
521         wait_for_completion_interruptible_timeout(&ia->ri_done,
522                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
523         rc = ia->ri_async_rc;
524         if (rc)
525                 goto out;
526
527         ia->ri_async_rc = -ETIMEDOUT;
528         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
529         if (rc) {
530                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
531                         __func__, rc);
532                 goto out;
533         }
534         wait_for_completion_interruptible_timeout(&ia->ri_done,
535                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
536         rc = ia->ri_async_rc;
537         if (rc)
538                 goto out;
539
540         return id;
541
542 out:
543         rdma_destroy_id(id);
544         return ERR_PTR(rc);
545 }
546
547 /*
548  * Drain any cq, prior to teardown.
549  */
550 static void
551 rpcrdma_clean_cq(struct ib_cq *cq)
552 {
553         struct ib_wc wc;
554         int count = 0;
555
556         while (1 == ib_poll_cq(cq, 1, &wc))
557                 ++count;
558
559         if (count)
560                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
561                         __func__, count, wc.opcode);
562 }
563
564 /*
565  * Exported functions.
566  */
567
568 /*
569  * Open and initialize an Interface Adapter.
570  *  o initializes fields of struct rpcrdma_ia, including
571  *    interface and provider attributes and protection zone.
572  */
573 int
574 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
575 {
576         int rc, mem_priv;
577         struct rpcrdma_ia *ia = &xprt->rx_ia;
578         struct ib_device_attr *devattr = &ia->ri_devattr;
579
580         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
581         if (IS_ERR(ia->ri_id)) {
582                 rc = PTR_ERR(ia->ri_id);
583                 goto out1;
584         }
585         ia->ri_device = ia->ri_id->device;
586
587         ia->ri_pd = ib_alloc_pd(ia->ri_device);
588         if (IS_ERR(ia->ri_pd)) {
589                 rc = PTR_ERR(ia->ri_pd);
590                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
591                         __func__, rc);
592                 goto out2;
593         }
594
595         rc = ib_query_device(ia->ri_device, devattr);
596         if (rc) {
597                 dprintk("RPC:       %s: ib_query_device failed %d\n",
598                         __func__, rc);
599                 goto out3;
600         }
601
602         if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
603                 ia->ri_have_dma_lkey = 1;
604                 ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
605         }
606
607         if (memreg == RPCRDMA_FRMR) {
608                 /* Requires both frmr reg and local dma lkey */
609                 if (((devattr->device_cap_flags &
610                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
611                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
612                       (devattr->max_fast_reg_page_list_len == 0)) {
613                         dprintk("RPC:       %s: FRMR registration "
614                                 "not supported by HCA\n", __func__);
615                         memreg = RPCRDMA_MTHCAFMR;
616                 }
617         }
618         if (memreg == RPCRDMA_MTHCAFMR) {
619                 if (!ia->ri_device->alloc_fmr) {
620                         dprintk("RPC:       %s: MTHCAFMR registration "
621                                 "not supported by HCA\n", __func__);
622                         memreg = RPCRDMA_ALLPHYSICAL;
623                 }
624         }
625
626         /*
627          * Optionally obtain an underlying physical identity mapping in
628          * order to do a memory window-based bind. This base registration
629          * is protected from remote access - that is enabled only by binding
630          * for the specific bytes targeted during each RPC operation, and
631          * revoked after the corresponding completion similar to a storage
632          * adapter.
633          */
634         switch (memreg) {
635         case RPCRDMA_FRMR:
636                 ia->ri_ops = &rpcrdma_frwr_memreg_ops;
637                 break;
638         case RPCRDMA_ALLPHYSICAL:
639                 ia->ri_ops = &rpcrdma_physical_memreg_ops;
640                 mem_priv = IB_ACCESS_LOCAL_WRITE |
641                                 IB_ACCESS_REMOTE_WRITE |
642                                 IB_ACCESS_REMOTE_READ;
643                 goto register_setup;
644         case RPCRDMA_MTHCAFMR:
645                 ia->ri_ops = &rpcrdma_fmr_memreg_ops;
646                 if (ia->ri_have_dma_lkey)
647                         break;
648                 mem_priv = IB_ACCESS_LOCAL_WRITE;
649         register_setup:
650                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
651                 if (IS_ERR(ia->ri_bind_mem)) {
652                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
653                                 "phys register failed with %lX\n",
654                                 __func__, PTR_ERR(ia->ri_bind_mem));
655                         rc = -ENOMEM;
656                         goto out3;
657                 }
658                 break;
659         default:
660                 printk(KERN_ERR "RPC: Unsupported memory "
661                                 "registration mode: %d\n", memreg);
662                 rc = -ENOMEM;
663                 goto out3;
664         }
665         dprintk("RPC:       %s: memory registration strategy is '%s'\n",
666                 __func__, ia->ri_ops->ro_displayname);
667
668         rwlock_init(&ia->ri_qplock);
669         return 0;
670
671 out3:
672         ib_dealloc_pd(ia->ri_pd);
673         ia->ri_pd = NULL;
674 out2:
675         rdma_destroy_id(ia->ri_id);
676         ia->ri_id = NULL;
677 out1:
678         return rc;
679 }
680
681 /*
682  * Clean up/close an IA.
683  *   o if event handles and PD have been initialized, free them.
684  *   o close the IA
685  */
686 void
687 rpcrdma_ia_close(struct rpcrdma_ia *ia)
688 {
689         int rc;
690
691         dprintk("RPC:       %s: entering\n", __func__);
692         if (ia->ri_bind_mem != NULL) {
693                 rc = ib_dereg_mr(ia->ri_bind_mem);
694                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
695                         __func__, rc);
696         }
697
698         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
699                 if (ia->ri_id->qp)
700                         rdma_destroy_qp(ia->ri_id);
701                 rdma_destroy_id(ia->ri_id);
702                 ia->ri_id = NULL;
703         }
704
705         /* If the pd is still busy, xprtrdma missed freeing a resource */
706         if (ia->ri_pd && !IS_ERR(ia->ri_pd))
707                 WARN_ON(ib_dealloc_pd(ia->ri_pd));
708 }
709
710 /*
711  * Create unconnected endpoint.
712  */
713 int
714 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
715                                 struct rpcrdma_create_data_internal *cdata)
716 {
717         struct ib_device_attr *devattr = &ia->ri_devattr;
718         struct ib_cq *sendcq, *recvcq;
719         int rc, err;
720
721         /* check provider's send/recv wr limits */
722         if (cdata->max_requests > devattr->max_qp_wr)
723                 cdata->max_requests = devattr->max_qp_wr;
724
725         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
726         ep->rep_attr.qp_context = ep;
727         ep->rep_attr.srq = NULL;
728         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
729         rc = ia->ri_ops->ro_open(ia, ep, cdata);
730         if (rc)
731                 return rc;
732         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
733         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
734         ep->rep_attr.cap.max_recv_sge = 1;
735         ep->rep_attr.cap.max_inline_data = 0;
736         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
737         ep->rep_attr.qp_type = IB_QPT_RC;
738         ep->rep_attr.port_num = ~0;
739
740         if (cdata->padding) {
741                 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
742                                                       GFP_KERNEL);
743                 if (IS_ERR(ep->rep_padbuf))
744                         return PTR_ERR(ep->rep_padbuf);
745         } else
746                 ep->rep_padbuf = NULL;
747
748         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
749                 "iovs: send %d recv %d\n",
750                 __func__,
751                 ep->rep_attr.cap.max_send_wr,
752                 ep->rep_attr.cap.max_recv_wr,
753                 ep->rep_attr.cap.max_send_sge,
754                 ep->rep_attr.cap.max_recv_sge);
755
756         /* set trigger for requesting send completion */
757         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
758         if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
759                 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
760         else if (ep->rep_cqinit <= 2)
761                 ep->rep_cqinit = 0;
762         INIT_CQCOUNT(ep);
763         init_waitqueue_head(&ep->rep_connect_wait);
764         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
765
766         sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
767                               rpcrdma_cq_async_error_upcall, ep,
768                               ep->rep_attr.cap.max_send_wr + 1, 0);
769         if (IS_ERR(sendcq)) {
770                 rc = PTR_ERR(sendcq);
771                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
772                         __func__, rc);
773                 goto out1;
774         }
775
776         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
777         if (rc) {
778                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
779                         __func__, rc);
780                 goto out2;
781         }
782
783         recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
784                               rpcrdma_cq_async_error_upcall, ep,
785                               ep->rep_attr.cap.max_recv_wr + 1, 0);
786         if (IS_ERR(recvcq)) {
787                 rc = PTR_ERR(recvcq);
788                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
789                         __func__, rc);
790                 goto out2;
791         }
792
793         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
794         if (rc) {
795                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
796                         __func__, rc);
797                 ib_destroy_cq(recvcq);
798                 goto out2;
799         }
800
801         ep->rep_attr.send_cq = sendcq;
802         ep->rep_attr.recv_cq = recvcq;
803
804         /* Initialize cma parameters */
805
806         /* RPC/RDMA does not use private data */
807         ep->rep_remote_cma.private_data = NULL;
808         ep->rep_remote_cma.private_data_len = 0;
809
810         /* Client offers RDMA Read but does not initiate */
811         ep->rep_remote_cma.initiator_depth = 0;
812         if (devattr->max_qp_rd_atom > 32)       /* arbitrary but <= 255 */
813                 ep->rep_remote_cma.responder_resources = 32;
814         else
815                 ep->rep_remote_cma.responder_resources =
816                                                 devattr->max_qp_rd_atom;
817
818         ep->rep_remote_cma.retry_count = 7;
819         ep->rep_remote_cma.flow_control = 0;
820         ep->rep_remote_cma.rnr_retry_count = 0;
821
822         return 0;
823
824 out2:
825         err = ib_destroy_cq(sendcq);
826         if (err)
827                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
828                         __func__, err);
829 out1:
830         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
831         return rc;
832 }
833
834 /*
835  * rpcrdma_ep_destroy
836  *
837  * Disconnect and destroy endpoint. After this, the only
838  * valid operations on the ep are to free it (if dynamically
839  * allocated) or re-create it.
840  */
841 void
842 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
843 {
844         int rc;
845
846         dprintk("RPC:       %s: entering, connected is %d\n",
847                 __func__, ep->rep_connected);
848
849         cancel_delayed_work_sync(&ep->rep_connect_worker);
850
851         if (ia->ri_id->qp) {
852                 rpcrdma_ep_disconnect(ep, ia);
853                 rdma_destroy_qp(ia->ri_id);
854                 ia->ri_id->qp = NULL;
855         }
856
857         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
858
859         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
860         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
861         if (rc)
862                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
863                         __func__, rc);
864
865         rpcrdma_clean_cq(ep->rep_attr.send_cq);
866         rc = ib_destroy_cq(ep->rep_attr.send_cq);
867         if (rc)
868                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
869                         __func__, rc);
870 }
871
872 /*
873  * Connect unconnected endpoint.
874  */
875 int
876 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
877 {
878         struct rdma_cm_id *id, *old;
879         int rc = 0;
880         int retry_count = 0;
881
882         if (ep->rep_connected != 0) {
883                 struct rpcrdma_xprt *xprt;
884 retry:
885                 dprintk("RPC:       %s: reconnecting...\n", __func__);
886
887                 rpcrdma_ep_disconnect(ep, ia);
888                 rpcrdma_flush_cqs(ep);
889
890                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
891                 id = rpcrdma_create_id(xprt, ia,
892                                 (struct sockaddr *)&xprt->rx_data.addr);
893                 if (IS_ERR(id)) {
894                         rc = -EHOSTUNREACH;
895                         goto out;
896                 }
897                 /* TEMP TEMP TEMP - fail if new device:
898                  * Deregister/remarshal *all* requests!
899                  * Close and recreate adapter, pd, etc!
900                  * Re-determine all attributes still sane!
901                  * More stuff I haven't thought of!
902                  * Rrrgh!
903                  */
904                 if (ia->ri_device != id->device) {
905                         printk("RPC:       %s: can't reconnect on "
906                                 "different device!\n", __func__);
907                         rdma_destroy_id(id);
908                         rc = -ENETUNREACH;
909                         goto out;
910                 }
911                 /* END TEMP */
912                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
913                 if (rc) {
914                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
915                                 __func__, rc);
916                         rdma_destroy_id(id);
917                         rc = -ENETUNREACH;
918                         goto out;
919                 }
920
921                 write_lock(&ia->ri_qplock);
922                 old = ia->ri_id;
923                 ia->ri_id = id;
924                 write_unlock(&ia->ri_qplock);
925
926                 rdma_destroy_qp(old);
927                 rdma_destroy_id(old);
928         } else {
929                 dprintk("RPC:       %s: connecting...\n", __func__);
930                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
931                 if (rc) {
932                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
933                                 __func__, rc);
934                         /* do not update ep->rep_connected */
935                         return -ENETUNREACH;
936                 }
937         }
938
939         ep->rep_connected = 0;
940
941         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
942         if (rc) {
943                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
944                                 __func__, rc);
945                 goto out;
946         }
947
948         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
949
950         /*
951          * Check state. A non-peer reject indicates no listener
952          * (ECONNREFUSED), which may be a transient state. All
953          * others indicate a transport condition which has already
954          * undergone a best-effort.
955          */
956         if (ep->rep_connected == -ECONNREFUSED &&
957             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
958                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
959                 goto retry;
960         }
961         if (ep->rep_connected <= 0) {
962                 /* Sometimes, the only way to reliably connect to remote
963                  * CMs is to use same nonzero values for ORD and IRD. */
964                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
965                     (ep->rep_remote_cma.responder_resources == 0 ||
966                      ep->rep_remote_cma.initiator_depth !=
967                                 ep->rep_remote_cma.responder_resources)) {
968                         if (ep->rep_remote_cma.responder_resources == 0)
969                                 ep->rep_remote_cma.responder_resources = 1;
970                         ep->rep_remote_cma.initiator_depth =
971                                 ep->rep_remote_cma.responder_resources;
972                         goto retry;
973                 }
974                 rc = ep->rep_connected;
975         } else {
976                 dprintk("RPC:       %s: connected\n", __func__);
977         }
978
979 out:
980         if (rc)
981                 ep->rep_connected = rc;
982         return rc;
983 }
984
985 /*
986  * rpcrdma_ep_disconnect
987  *
988  * This is separate from destroy to facilitate the ability
989  * to reconnect without recreating the endpoint.
990  *
991  * This call is not reentrant, and must not be made in parallel
992  * on the same endpoint.
993  */
994 void
995 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
996 {
997         int rc;
998
999         rpcrdma_flush_cqs(ep);
1000         rc = rdma_disconnect(ia->ri_id);
1001         if (!rc) {
1002                 /* returns without wait if not connected */
1003                 wait_event_interruptible(ep->rep_connect_wait,
1004                                                         ep->rep_connected != 1);
1005                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1006                         (ep->rep_connected == 1) ? "still " : "dis");
1007         } else {
1008                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1009                 ep->rep_connected = rc;
1010         }
1011 }
1012
1013 static struct rpcrdma_req *
1014 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1015 {
1016         struct rpcrdma_req *req;
1017
1018         req = kzalloc(sizeof(*req), GFP_KERNEL);
1019         if (req == NULL)
1020                 return ERR_PTR(-ENOMEM);
1021
1022         req->rl_buffer = &r_xprt->rx_buf;
1023         return req;
1024 }
1025
1026 static struct rpcrdma_rep *
1027 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1028 {
1029         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1030         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1031         struct rpcrdma_rep *rep;
1032         int rc;
1033
1034         rc = -ENOMEM;
1035         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1036         if (rep == NULL)
1037                 goto out;
1038
1039         rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1040                                                GFP_KERNEL);
1041         if (IS_ERR(rep->rr_rdmabuf)) {
1042                 rc = PTR_ERR(rep->rr_rdmabuf);
1043                 goto out_free;
1044         }
1045
1046         rep->rr_device = ia->ri_device;
1047         rep->rr_rxprt = r_xprt;
1048         return rep;
1049
1050 out_free:
1051         kfree(rep);
1052 out:
1053         return ERR_PTR(rc);
1054 }
1055
1056 int
1057 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1058 {
1059         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1060         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1061         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1062         char *p;
1063         size_t len;
1064         int i, rc;
1065
1066         buf->rb_max_requests = cdata->max_requests;
1067         spin_lock_init(&buf->rb_lock);
1068
1069         /* Need to allocate:
1070          *   1.  arrays for send and recv pointers
1071          *   2.  arrays of struct rpcrdma_req to fill in pointers
1072          *   3.  array of struct rpcrdma_rep for replies
1073          * Send/recv buffers in req/rep need to be registered
1074          */
1075         len = buf->rb_max_requests *
1076                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1077
1078         p = kzalloc(len, GFP_KERNEL);
1079         if (p == NULL) {
1080                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1081                         __func__, len);
1082                 rc = -ENOMEM;
1083                 goto out;
1084         }
1085         buf->rb_pool = p;       /* for freeing it later */
1086
1087         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1088         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1089         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1090         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1091
1092         rc = ia->ri_ops->ro_init(r_xprt);
1093         if (rc)
1094                 goto out;
1095
1096         for (i = 0; i < buf->rb_max_requests; i++) {
1097                 struct rpcrdma_req *req;
1098                 struct rpcrdma_rep *rep;
1099
1100                 req = rpcrdma_create_req(r_xprt);
1101                 if (IS_ERR(req)) {
1102                         dprintk("RPC:       %s: request buffer %d alloc"
1103                                 " failed\n", __func__, i);
1104                         rc = PTR_ERR(req);
1105                         goto out;
1106                 }
1107                 buf->rb_send_bufs[i] = req;
1108
1109                 rep = rpcrdma_create_rep(r_xprt);
1110                 if (IS_ERR(rep)) {
1111                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1112                                 __func__, i);
1113                         rc = PTR_ERR(rep);
1114                         goto out;
1115                 }
1116                 buf->rb_recv_bufs[i] = rep;
1117         }
1118
1119         return 0;
1120 out:
1121         rpcrdma_buffer_destroy(buf);
1122         return rc;
1123 }
1124
1125 static void
1126 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1127 {
1128         if (!rep)
1129                 return;
1130
1131         rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1132         kfree(rep);
1133 }
1134
1135 static void
1136 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1137 {
1138         if (!req)
1139                 return;
1140
1141         rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1142         rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1143         kfree(req);
1144 }
1145
1146 void
1147 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1148 {
1149         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1150         int i;
1151
1152         /* clean up in reverse order from create
1153          *   1.  recv mr memory (mr free, then kfree)
1154          *   2.  send mr memory (mr free, then kfree)
1155          *   3.  MWs
1156          */
1157         dprintk("RPC:       %s: entering\n", __func__);
1158
1159         for (i = 0; i < buf->rb_max_requests; i++) {
1160                 if (buf->rb_recv_bufs)
1161                         rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1162                 if (buf->rb_send_bufs)
1163                         rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1164         }
1165
1166         ia->ri_ops->ro_destroy(buf);
1167
1168         kfree(buf->rb_pool);
1169 }
1170
1171 struct rpcrdma_mw *
1172 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
1173 {
1174         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1175         struct rpcrdma_mw *mw = NULL;
1176
1177         spin_lock(&buf->rb_mwlock);
1178         if (!list_empty(&buf->rb_mws)) {
1179                 mw = list_first_entry(&buf->rb_mws,
1180                                       struct rpcrdma_mw, mw_list);
1181                 list_del_init(&mw->mw_list);
1182         }
1183         spin_unlock(&buf->rb_mwlock);
1184
1185         if (!mw)
1186                 pr_err("RPC:       %s: no MWs available\n", __func__);
1187         return mw;
1188 }
1189
1190 void
1191 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
1192 {
1193         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1194
1195         spin_lock(&buf->rb_mwlock);
1196         list_add_tail(&mw->mw_list, &buf->rb_mws);
1197         spin_unlock(&buf->rb_mwlock);
1198 }
1199
1200 static void
1201 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1202 {
1203         buf->rb_send_bufs[--buf->rb_send_index] = req;
1204         req->rl_niovs = 0;
1205         if (req->rl_reply) {
1206                 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1207                 req->rl_reply = NULL;
1208         }
1209 }
1210
1211 /*
1212  * Get a set of request/reply buffers.
1213  *
1214  * Reply buffer (if needed) is attached to send buffer upon return.
1215  * Rule:
1216  *    rb_send_index and rb_recv_index MUST always be pointing to the
1217  *    *next* available buffer (non-NULL). They are incremented after
1218  *    removing buffers, and decremented *before* returning them.
1219  */
1220 struct rpcrdma_req *
1221 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1222 {
1223         struct rpcrdma_req *req;
1224         unsigned long flags;
1225
1226         spin_lock_irqsave(&buffers->rb_lock, flags);
1227
1228         if (buffers->rb_send_index == buffers->rb_max_requests) {
1229                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1230                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1231                 return ((struct rpcrdma_req *)NULL);
1232         }
1233
1234         req = buffers->rb_send_bufs[buffers->rb_send_index];
1235         if (buffers->rb_send_index < buffers->rb_recv_index) {
1236                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1237                         __func__,
1238                         buffers->rb_recv_index - buffers->rb_send_index);
1239                 req->rl_reply = NULL;
1240         } else {
1241                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1242                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1243         }
1244         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1245
1246         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1247         return req;
1248 }
1249
1250 /*
1251  * Put request/reply buffers back into pool.
1252  * Pre-decrement counter/array index.
1253  */
1254 void
1255 rpcrdma_buffer_put(struct rpcrdma_req *req)
1256 {
1257         struct rpcrdma_buffer *buffers = req->rl_buffer;
1258         unsigned long flags;
1259
1260         spin_lock_irqsave(&buffers->rb_lock, flags);
1261         rpcrdma_buffer_put_sendbuf(req, buffers);
1262         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1263 }
1264
1265 /*
1266  * Recover reply buffers from pool.
1267  * This happens when recovering from error conditions.
1268  * Post-increment counter/array index.
1269  */
1270 void
1271 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1272 {
1273         struct rpcrdma_buffer *buffers = req->rl_buffer;
1274         unsigned long flags;
1275
1276         spin_lock_irqsave(&buffers->rb_lock, flags);
1277         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1278                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1279                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1280         }
1281         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1282 }
1283
1284 /*
1285  * Put reply buffers back into pool when not attached to
1286  * request. This happens in error conditions.
1287  */
1288 void
1289 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1290 {
1291         struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1292         unsigned long flags;
1293
1294         spin_lock_irqsave(&buffers->rb_lock, flags);
1295         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1296         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1297 }
1298
1299 /*
1300  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1301  */
1302
1303 void
1304 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1305 {
1306         dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1307                 seg->mr_offset,
1308                 (unsigned long long)seg->mr_dma, seg->mr_dmalen);
1309 }
1310
1311 static int
1312 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1313                                 struct ib_mr **mrp, struct ib_sge *iov)
1314 {
1315         struct ib_phys_buf ipb;
1316         struct ib_mr *mr;
1317         int rc;
1318
1319         /*
1320          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1321          */
1322         iov->addr = ib_dma_map_single(ia->ri_device,
1323                         va, len, DMA_BIDIRECTIONAL);
1324         if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1325                 return -ENOMEM;
1326
1327         iov->length = len;
1328
1329         if (ia->ri_have_dma_lkey) {
1330                 *mrp = NULL;
1331                 iov->lkey = ia->ri_dma_lkey;
1332                 return 0;
1333         } else if (ia->ri_bind_mem != NULL) {
1334                 *mrp = NULL;
1335                 iov->lkey = ia->ri_bind_mem->lkey;
1336                 return 0;
1337         }
1338
1339         ipb.addr = iov->addr;
1340         ipb.size = iov->length;
1341         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1342                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1343
1344         dprintk("RPC:       %s: phys convert: 0x%llx "
1345                         "registered 0x%llx length %d\n",
1346                         __func__, (unsigned long long)ipb.addr,
1347                         (unsigned long long)iov->addr, len);
1348
1349         if (IS_ERR(mr)) {
1350                 *mrp = NULL;
1351                 rc = PTR_ERR(mr);
1352                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1353         } else {
1354                 *mrp = mr;
1355                 iov->lkey = mr->lkey;
1356                 rc = 0;
1357         }
1358
1359         return rc;
1360 }
1361
1362 static int
1363 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1364                                 struct ib_mr *mr, struct ib_sge *iov)
1365 {
1366         int rc;
1367
1368         ib_dma_unmap_single(ia->ri_device,
1369                             iov->addr, iov->length, DMA_BIDIRECTIONAL);
1370
1371         if (NULL == mr)
1372                 return 0;
1373
1374         rc = ib_dereg_mr(mr);
1375         if (rc)
1376                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1377         return rc;
1378 }
1379
1380 /**
1381  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1382  * @ia: controlling rpcrdma_ia
1383  * @size: size of buffer to be allocated, in bytes
1384  * @flags: GFP flags
1385  *
1386  * Returns pointer to private header of an area of internally
1387  * registered memory, or an ERR_PTR. The registered buffer follows
1388  * the end of the private header.
1389  *
1390  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1391  * receiving the payload of RDMA RECV operations. regbufs are not
1392  * used for RDMA READ/WRITE operations, thus are registered only for
1393  * LOCAL access.
1394  */
1395 struct rpcrdma_regbuf *
1396 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1397 {
1398         struct rpcrdma_regbuf *rb;
1399         int rc;
1400
1401         rc = -ENOMEM;
1402         rb = kmalloc(sizeof(*rb) + size, flags);
1403         if (rb == NULL)
1404                 goto out;
1405
1406         rb->rg_size = size;
1407         rb->rg_owner = NULL;
1408         rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1409                                        &rb->rg_mr, &rb->rg_iov);
1410         if (rc)
1411                 goto out_free;
1412
1413         return rb;
1414
1415 out_free:
1416         kfree(rb);
1417 out:
1418         return ERR_PTR(rc);
1419 }
1420
1421 /**
1422  * rpcrdma_free_regbuf - deregister and free registered buffer
1423  * @ia: controlling rpcrdma_ia
1424  * @rb: regbuf to be deregistered and freed
1425  */
1426 void
1427 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1428 {
1429         if (rb) {
1430                 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1431                 kfree(rb);
1432         }
1433 }
1434
1435 /*
1436  * Prepost any receive buffer, then post send.
1437  *
1438  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1439  */
1440 int
1441 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1442                 struct rpcrdma_ep *ep,
1443                 struct rpcrdma_req *req)
1444 {
1445         struct ib_send_wr send_wr, *send_wr_fail;
1446         struct rpcrdma_rep *rep = req->rl_reply;
1447         int rc;
1448
1449         if (rep) {
1450                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1451                 if (rc)
1452                         goto out;
1453                 req->rl_reply = NULL;
1454         }
1455
1456         send_wr.next = NULL;
1457         send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
1458         send_wr.sg_list = req->rl_send_iov;
1459         send_wr.num_sge = req->rl_niovs;
1460         send_wr.opcode = IB_WR_SEND;
1461         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1462                 ib_dma_sync_single_for_device(ia->ri_device,
1463                                               req->rl_send_iov[3].addr,
1464                                               req->rl_send_iov[3].length,
1465                                               DMA_TO_DEVICE);
1466         ib_dma_sync_single_for_device(ia->ri_device,
1467                                       req->rl_send_iov[1].addr,
1468                                       req->rl_send_iov[1].length,
1469                                       DMA_TO_DEVICE);
1470         ib_dma_sync_single_for_device(ia->ri_device,
1471                                       req->rl_send_iov[0].addr,
1472                                       req->rl_send_iov[0].length,
1473                                       DMA_TO_DEVICE);
1474
1475         if (DECR_CQCOUNT(ep) > 0)
1476                 send_wr.send_flags = 0;
1477         else { /* Provider must take a send completion every now and then */
1478                 INIT_CQCOUNT(ep);
1479                 send_wr.send_flags = IB_SEND_SIGNALED;
1480         }
1481
1482         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1483         if (rc)
1484                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1485                         rc);
1486 out:
1487         return rc;
1488 }
1489
1490 /*
1491  * (Re)post a receive buffer.
1492  */
1493 int
1494 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1495                      struct rpcrdma_ep *ep,
1496                      struct rpcrdma_rep *rep)
1497 {
1498         struct ib_recv_wr recv_wr, *recv_wr_fail;
1499         int rc;
1500
1501         recv_wr.next = NULL;
1502         recv_wr.wr_id = (u64) (unsigned long) rep;
1503         recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1504         recv_wr.num_sge = 1;
1505
1506         ib_dma_sync_single_for_cpu(ia->ri_device,
1507                                    rdmab_addr(rep->rr_rdmabuf),
1508                                    rdmab_length(rep->rr_rdmabuf),
1509                                    DMA_BIDIRECTIONAL);
1510
1511         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1512
1513         if (rc)
1514                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1515                         rc);
1516         return rc;
1517 }
1518
1519 /* How many chunk list items fit within our inline buffers?
1520  */
1521 unsigned int
1522 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1523 {
1524         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1525         int bytes, segments;
1526
1527         bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1528         bytes -= RPCRDMA_HDRLEN_MIN;
1529         if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1530                 pr_warn("RPC:       %s: inline threshold too small\n",
1531                         __func__);
1532                 return 0;
1533         }
1534
1535         segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1536         dprintk("RPC:       %s: max chunk list size = %d segments\n",
1537                 __func__, segments);
1538         return segments;
1539 }