2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * Encapsulates the major functions managing:
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
56 #include "xprt_rdma.h"
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY RPCDBG_TRANS
71 * handle replies in tasklet context, using a single, global list
72 * rdma tasklet function -- just turn around and call the func
73 * for all replies on the list
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
80 rpcrdma_run_tasklet(unsigned long data)
82 struct rpcrdma_rep *rep;
86 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
87 while (!list_empty(&rpcrdma_tasklets_g)) {
88 rep = list_entry(rpcrdma_tasklets_g.next,
89 struct rpcrdma_rep, rr_list);
90 list_del(&rep->rr_list);
91 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 rpcrdma_reply_handler(rep);
95 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
97 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
102 static const char * const async_event[] = {
107 "communication established",
108 "send queue drained",
109 "path migration successful",
111 "device fatal error",
124 #define ASYNC_MSG(status) \
125 ((status) < ARRAY_SIZE(async_event) ? \
126 async_event[(status)] : "unknown async error")
129 rpcrdma_schedule_tasklet(struct list_head *sched_list)
133 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
134 list_splice_tail(sched_list, &rpcrdma_tasklets_g);
135 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
136 tasklet_schedule(&rpcrdma_tasklet_g);
140 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
142 struct rpcrdma_ep *ep = context;
144 pr_err("RPC: %s: %s on device %s ep %p\n",
145 __func__, ASYNC_MSG(event->event),
146 event->device->name, context);
147 if (ep->rep_connected == 1) {
148 ep->rep_connected = -EIO;
149 rpcrdma_conn_func(ep);
150 wake_up_all(&ep->rep_connect_wait);
155 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
157 struct rpcrdma_ep *ep = context;
159 pr_err("RPC: %s: %s on device %s ep %p\n",
160 __func__, ASYNC_MSG(event->event),
161 event->device->name, context);
162 if (ep->rep_connected == 1) {
163 ep->rep_connected = -EIO;
164 rpcrdma_conn_func(ep);
165 wake_up_all(&ep->rep_connect_wait);
169 static const char * const wc_status[] = {
171 "local length error",
172 "local QP operation error",
173 "local EE context operation error",
174 "local protection error",
176 "memory management operation error",
177 "bad response error",
178 "local access error",
179 "remote invalid request error",
180 "remote access error",
181 "remote operation error",
182 "transport retry counter exceeded",
183 "RNR retry counter exceeded",
184 "local RDD violation error",
185 "remove invalid RD request",
187 "invalid EE context number",
188 "invalid EE context state",
190 "response timeout error",
194 #define COMPLETION_MSG(status) \
195 ((status) < ARRAY_SIZE(wc_status) ? \
196 wc_status[(status)] : "unexpected completion error")
199 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
201 /* WARNING: Only wr_id and status are reliable at this point */
202 if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
203 if (wc->status != IB_WC_SUCCESS &&
204 wc->status != IB_WC_WR_FLUSH_ERR)
205 pr_err("RPC: %s: SEND: %s\n",
206 __func__, COMPLETION_MSG(wc->status));
208 struct rpcrdma_mw *r;
210 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
211 r->mw_sendcompletion(wc);
216 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
219 int budget, count, rc;
221 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
223 wcs = ep->rep_send_wcs;
225 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
231 rpcrdma_sendcq_process_wc(wcs++);
232 } while (rc == RPCRDMA_POLLSIZE && --budget);
237 * Handle send, fast_reg_mr, and local_inv completions.
239 * Send events are typically suppressed and thus do not result
240 * in an upcall. Occasionally one is signaled, however. This
241 * prevents the provider's completion queue from wrapping and
242 * losing a completion.
245 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
247 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
250 rc = rpcrdma_sendcq_poll(cq, ep);
252 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
257 rc = ib_req_notify_cq(cq,
258 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
262 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
267 rpcrdma_sendcq_poll(cq, ep);
271 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
273 struct rpcrdma_rep *rep =
274 (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
276 /* WARNING: Only wr_id and status are reliable at this point */
277 if (wc->status != IB_WC_SUCCESS)
280 /* status == SUCCESS means all fields in wc are trustworthy */
281 if (wc->opcode != IB_WC_RECV)
284 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
285 __func__, rep, wc->byte_len);
287 rep->rr_len = wc->byte_len;
288 ib_dma_sync_single_for_cpu(rep->rr_device,
289 rdmab_addr(rep->rr_rdmabuf),
290 rep->rr_len, DMA_FROM_DEVICE);
291 prefetch(rdmab_to_msg(rep->rr_rdmabuf));
294 list_add_tail(&rep->rr_list, sched_list);
297 if (wc->status != IB_WC_WR_FLUSH_ERR)
298 pr_err("RPC: %s: rep %p: %s\n",
299 __func__, rep, COMPLETION_MSG(wc->status));
305 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
307 struct list_head sched_list;
309 int budget, count, rc;
311 INIT_LIST_HEAD(&sched_list);
312 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
314 wcs = ep->rep_recv_wcs;
316 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
322 rpcrdma_recvcq_process_wc(wcs++, &sched_list);
323 } while (rc == RPCRDMA_POLLSIZE && --budget);
327 rpcrdma_schedule_tasklet(&sched_list);
332 * Handle receive completions.
334 * It is reentrant but processes single events in order to maintain
335 * ordering of receives to keep server credits.
337 * It is the responsibility of the scheduled tasklet to return
338 * recv buffers to the pool. NOTE: this affects synchronization of
339 * connection shutdown. That is, the structures required for
340 * the completion of the reply handler must remain intact until
341 * all memory has been reclaimed.
344 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
346 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
349 rc = rpcrdma_recvcq_poll(cq, ep);
351 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
356 rc = ib_req_notify_cq(cq,
357 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
361 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
366 rpcrdma_recvcq_poll(cq, ep);
370 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
373 LIST_HEAD(sched_list);
375 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
376 rpcrdma_recvcq_process_wc(&wc, &sched_list);
377 if (!list_empty(&sched_list))
378 rpcrdma_schedule_tasklet(&sched_list);
379 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
380 rpcrdma_sendcq_process_wc(&wc);
383 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
384 static const char * const conn[] = {
403 #define CONNECTION_MSG(status) \
404 ((status) < ARRAY_SIZE(conn) ? \
405 conn[(status)] : "unrecognized connection error")
409 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
411 struct rpcrdma_xprt *xprt = id->context;
412 struct rpcrdma_ia *ia = &xprt->rx_ia;
413 struct rpcrdma_ep *ep = &xprt->rx_ep;
414 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
415 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
417 struct ib_qp_attr *attr = &ia->ri_qp_attr;
418 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
421 switch (event->event) {
422 case RDMA_CM_EVENT_ADDR_RESOLVED:
423 case RDMA_CM_EVENT_ROUTE_RESOLVED:
425 complete(&ia->ri_done);
427 case RDMA_CM_EVENT_ADDR_ERROR:
428 ia->ri_async_rc = -EHOSTUNREACH;
429 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
431 complete(&ia->ri_done);
433 case RDMA_CM_EVENT_ROUTE_ERROR:
434 ia->ri_async_rc = -ENETUNREACH;
435 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
437 complete(&ia->ri_done);
439 case RDMA_CM_EVENT_ESTABLISHED:
441 ib_query_qp(ia->ri_id->qp, attr,
442 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
444 dprintk("RPC: %s: %d responder resources"
446 __func__, attr->max_dest_rd_atomic,
447 attr->max_rd_atomic);
449 case RDMA_CM_EVENT_CONNECT_ERROR:
450 connstate = -ENOTCONN;
452 case RDMA_CM_EVENT_UNREACHABLE:
453 connstate = -ENETDOWN;
455 case RDMA_CM_EVENT_REJECTED:
456 connstate = -ECONNREFUSED;
458 case RDMA_CM_EVENT_DISCONNECTED:
459 connstate = -ECONNABORTED;
461 case RDMA_CM_EVENT_DEVICE_REMOVAL:
464 dprintk("RPC: %s: %sconnected\n",
465 __func__, connstate > 0 ? "" : "dis");
466 ep->rep_connected = connstate;
467 rpcrdma_conn_func(ep);
468 wake_up_all(&ep->rep_connect_wait);
471 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n",
472 __func__, sap, rpc_get_port(sap), ep,
473 CONNECTION_MSG(event->event));
477 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
478 if (connstate == 1) {
479 int ird = attr->max_dest_rd_atomic;
480 int tird = ep->rep_remote_cma.responder_resources;
482 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
483 sap, rpc_get_port(sap),
485 ia->ri_ops->ro_displayname,
486 xprt->rx_buf.rb_max_requests,
487 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
488 } else if (connstate < 0) {
489 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
490 sap, rpc_get_port(sap), connstate);
497 static struct rdma_cm_id *
498 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
499 struct rpcrdma_ia *ia, struct sockaddr *addr)
501 struct rdma_cm_id *id;
504 init_completion(&ia->ri_done);
506 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
509 dprintk("RPC: %s: rdma_create_id() failed %i\n",
514 ia->ri_async_rc = -ETIMEDOUT;
515 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
517 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
521 wait_for_completion_interruptible_timeout(&ia->ri_done,
522 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
523 rc = ia->ri_async_rc;
527 ia->ri_async_rc = -ETIMEDOUT;
528 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
530 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
534 wait_for_completion_interruptible_timeout(&ia->ri_done,
535 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
536 rc = ia->ri_async_rc;
548 * Drain any cq, prior to teardown.
551 rpcrdma_clean_cq(struct ib_cq *cq)
556 while (1 == ib_poll_cq(cq, 1, &wc))
560 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
561 __func__, count, wc.opcode);
565 * Exported functions.
569 * Open and initialize an Interface Adapter.
570 * o initializes fields of struct rpcrdma_ia, including
571 * interface and provider attributes and protection zone.
574 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
577 struct rpcrdma_ia *ia = &xprt->rx_ia;
578 struct ib_device_attr *devattr = &ia->ri_devattr;
580 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
581 if (IS_ERR(ia->ri_id)) {
582 rc = PTR_ERR(ia->ri_id);
585 ia->ri_device = ia->ri_id->device;
587 ia->ri_pd = ib_alloc_pd(ia->ri_device);
588 if (IS_ERR(ia->ri_pd)) {
589 rc = PTR_ERR(ia->ri_pd);
590 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
595 rc = ib_query_device(ia->ri_device, devattr);
597 dprintk("RPC: %s: ib_query_device failed %d\n",
602 if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
603 ia->ri_have_dma_lkey = 1;
604 ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
607 if (memreg == RPCRDMA_FRMR) {
608 /* Requires both frmr reg and local dma lkey */
609 if (((devattr->device_cap_flags &
610 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
611 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
612 (devattr->max_fast_reg_page_list_len == 0)) {
613 dprintk("RPC: %s: FRMR registration "
614 "not supported by HCA\n", __func__);
615 memreg = RPCRDMA_MTHCAFMR;
618 if (memreg == RPCRDMA_MTHCAFMR) {
619 if (!ia->ri_device->alloc_fmr) {
620 dprintk("RPC: %s: MTHCAFMR registration "
621 "not supported by HCA\n", __func__);
622 memreg = RPCRDMA_ALLPHYSICAL;
627 * Optionally obtain an underlying physical identity mapping in
628 * order to do a memory window-based bind. This base registration
629 * is protected from remote access - that is enabled only by binding
630 * for the specific bytes targeted during each RPC operation, and
631 * revoked after the corresponding completion similar to a storage
636 ia->ri_ops = &rpcrdma_frwr_memreg_ops;
638 case RPCRDMA_ALLPHYSICAL:
639 ia->ri_ops = &rpcrdma_physical_memreg_ops;
640 mem_priv = IB_ACCESS_LOCAL_WRITE |
641 IB_ACCESS_REMOTE_WRITE |
642 IB_ACCESS_REMOTE_READ;
644 case RPCRDMA_MTHCAFMR:
645 ia->ri_ops = &rpcrdma_fmr_memreg_ops;
646 if (ia->ri_have_dma_lkey)
648 mem_priv = IB_ACCESS_LOCAL_WRITE;
650 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
651 if (IS_ERR(ia->ri_bind_mem)) {
652 printk(KERN_ALERT "%s: ib_get_dma_mr for "
653 "phys register failed with %lX\n",
654 __func__, PTR_ERR(ia->ri_bind_mem));
660 printk(KERN_ERR "RPC: Unsupported memory "
661 "registration mode: %d\n", memreg);
665 dprintk("RPC: %s: memory registration strategy is '%s'\n",
666 __func__, ia->ri_ops->ro_displayname);
668 rwlock_init(&ia->ri_qplock);
672 ib_dealloc_pd(ia->ri_pd);
675 rdma_destroy_id(ia->ri_id);
682 * Clean up/close an IA.
683 * o if event handles and PD have been initialized, free them.
687 rpcrdma_ia_close(struct rpcrdma_ia *ia)
691 dprintk("RPC: %s: entering\n", __func__);
692 if (ia->ri_bind_mem != NULL) {
693 rc = ib_dereg_mr(ia->ri_bind_mem);
694 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
698 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
700 rdma_destroy_qp(ia->ri_id);
701 rdma_destroy_id(ia->ri_id);
705 /* If the pd is still busy, xprtrdma missed freeing a resource */
706 if (ia->ri_pd && !IS_ERR(ia->ri_pd))
707 WARN_ON(ib_dealloc_pd(ia->ri_pd));
711 * Create unconnected endpoint.
714 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
715 struct rpcrdma_create_data_internal *cdata)
717 struct ib_device_attr *devattr = &ia->ri_devattr;
718 struct ib_cq *sendcq, *recvcq;
721 /* check provider's send/recv wr limits */
722 if (cdata->max_requests > devattr->max_qp_wr)
723 cdata->max_requests = devattr->max_qp_wr;
725 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
726 ep->rep_attr.qp_context = ep;
727 ep->rep_attr.srq = NULL;
728 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
729 rc = ia->ri_ops->ro_open(ia, ep, cdata);
732 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
733 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
734 ep->rep_attr.cap.max_recv_sge = 1;
735 ep->rep_attr.cap.max_inline_data = 0;
736 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
737 ep->rep_attr.qp_type = IB_QPT_RC;
738 ep->rep_attr.port_num = ~0;
740 if (cdata->padding) {
741 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
743 if (IS_ERR(ep->rep_padbuf))
744 return PTR_ERR(ep->rep_padbuf);
746 ep->rep_padbuf = NULL;
748 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
749 "iovs: send %d recv %d\n",
751 ep->rep_attr.cap.max_send_wr,
752 ep->rep_attr.cap.max_recv_wr,
753 ep->rep_attr.cap.max_send_sge,
754 ep->rep_attr.cap.max_recv_sge);
756 /* set trigger for requesting send completion */
757 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
758 if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
759 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
760 else if (ep->rep_cqinit <= 2)
763 init_waitqueue_head(&ep->rep_connect_wait);
764 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
766 sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
767 rpcrdma_cq_async_error_upcall, ep,
768 ep->rep_attr.cap.max_send_wr + 1, 0);
769 if (IS_ERR(sendcq)) {
770 rc = PTR_ERR(sendcq);
771 dprintk("RPC: %s: failed to create send CQ: %i\n",
776 rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
778 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
783 recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
784 rpcrdma_cq_async_error_upcall, ep,
785 ep->rep_attr.cap.max_recv_wr + 1, 0);
786 if (IS_ERR(recvcq)) {
787 rc = PTR_ERR(recvcq);
788 dprintk("RPC: %s: failed to create recv CQ: %i\n",
793 rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
795 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
797 ib_destroy_cq(recvcq);
801 ep->rep_attr.send_cq = sendcq;
802 ep->rep_attr.recv_cq = recvcq;
804 /* Initialize cma parameters */
806 /* RPC/RDMA does not use private data */
807 ep->rep_remote_cma.private_data = NULL;
808 ep->rep_remote_cma.private_data_len = 0;
810 /* Client offers RDMA Read but does not initiate */
811 ep->rep_remote_cma.initiator_depth = 0;
812 if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */
813 ep->rep_remote_cma.responder_resources = 32;
815 ep->rep_remote_cma.responder_resources =
816 devattr->max_qp_rd_atom;
818 ep->rep_remote_cma.retry_count = 7;
819 ep->rep_remote_cma.flow_control = 0;
820 ep->rep_remote_cma.rnr_retry_count = 0;
825 err = ib_destroy_cq(sendcq);
827 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
830 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
837 * Disconnect and destroy endpoint. After this, the only
838 * valid operations on the ep are to free it (if dynamically
839 * allocated) or re-create it.
842 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
846 dprintk("RPC: %s: entering, connected is %d\n",
847 __func__, ep->rep_connected);
849 cancel_delayed_work_sync(&ep->rep_connect_worker);
852 rpcrdma_ep_disconnect(ep, ia);
853 rdma_destroy_qp(ia->ri_id);
854 ia->ri_id->qp = NULL;
857 rpcrdma_free_regbuf(ia, ep->rep_padbuf);
859 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
860 rc = ib_destroy_cq(ep->rep_attr.recv_cq);
862 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
865 rpcrdma_clean_cq(ep->rep_attr.send_cq);
866 rc = ib_destroy_cq(ep->rep_attr.send_cq);
868 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
873 * Connect unconnected endpoint.
876 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
878 struct rdma_cm_id *id, *old;
882 if (ep->rep_connected != 0) {
883 struct rpcrdma_xprt *xprt;
885 dprintk("RPC: %s: reconnecting...\n", __func__);
887 rpcrdma_ep_disconnect(ep, ia);
888 rpcrdma_flush_cqs(ep);
890 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
891 id = rpcrdma_create_id(xprt, ia,
892 (struct sockaddr *)&xprt->rx_data.addr);
897 /* TEMP TEMP TEMP - fail if new device:
898 * Deregister/remarshal *all* requests!
899 * Close and recreate adapter, pd, etc!
900 * Re-determine all attributes still sane!
901 * More stuff I haven't thought of!
904 if (ia->ri_device != id->device) {
905 printk("RPC: %s: can't reconnect on "
906 "different device!\n", __func__);
912 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
914 dprintk("RPC: %s: rdma_create_qp failed %i\n",
921 write_lock(&ia->ri_qplock);
924 write_unlock(&ia->ri_qplock);
926 rdma_destroy_qp(old);
927 rdma_destroy_id(old);
929 dprintk("RPC: %s: connecting...\n", __func__);
930 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
932 dprintk("RPC: %s: rdma_create_qp failed %i\n",
934 /* do not update ep->rep_connected */
939 ep->rep_connected = 0;
941 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
943 dprintk("RPC: %s: rdma_connect() failed with %i\n",
948 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
951 * Check state. A non-peer reject indicates no listener
952 * (ECONNREFUSED), which may be a transient state. All
953 * others indicate a transport condition which has already
954 * undergone a best-effort.
956 if (ep->rep_connected == -ECONNREFUSED &&
957 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
958 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
961 if (ep->rep_connected <= 0) {
962 /* Sometimes, the only way to reliably connect to remote
963 * CMs is to use same nonzero values for ORD and IRD. */
964 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
965 (ep->rep_remote_cma.responder_resources == 0 ||
966 ep->rep_remote_cma.initiator_depth !=
967 ep->rep_remote_cma.responder_resources)) {
968 if (ep->rep_remote_cma.responder_resources == 0)
969 ep->rep_remote_cma.responder_resources = 1;
970 ep->rep_remote_cma.initiator_depth =
971 ep->rep_remote_cma.responder_resources;
974 rc = ep->rep_connected;
976 dprintk("RPC: %s: connected\n", __func__);
981 ep->rep_connected = rc;
986 * rpcrdma_ep_disconnect
988 * This is separate from destroy to facilitate the ability
989 * to reconnect without recreating the endpoint.
991 * This call is not reentrant, and must not be made in parallel
992 * on the same endpoint.
995 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
999 rpcrdma_flush_cqs(ep);
1000 rc = rdma_disconnect(ia->ri_id);
1002 /* returns without wait if not connected */
1003 wait_event_interruptible(ep->rep_connect_wait,
1004 ep->rep_connected != 1);
1005 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
1006 (ep->rep_connected == 1) ? "still " : "dis");
1008 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
1009 ep->rep_connected = rc;
1013 static struct rpcrdma_req *
1014 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1016 struct rpcrdma_req *req;
1018 req = kzalloc(sizeof(*req), GFP_KERNEL);
1020 return ERR_PTR(-ENOMEM);
1022 req->rl_buffer = &r_xprt->rx_buf;
1026 static struct rpcrdma_rep *
1027 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1029 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1030 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1031 struct rpcrdma_rep *rep;
1035 rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1039 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1041 if (IS_ERR(rep->rr_rdmabuf)) {
1042 rc = PTR_ERR(rep->rr_rdmabuf);
1046 rep->rr_device = ia->ri_device;
1047 rep->rr_rxprt = r_xprt;
1057 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1059 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1060 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1061 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1066 buf->rb_max_requests = cdata->max_requests;
1067 spin_lock_init(&buf->rb_lock);
1069 /* Need to allocate:
1070 * 1. arrays for send and recv pointers
1071 * 2. arrays of struct rpcrdma_req to fill in pointers
1072 * 3. array of struct rpcrdma_rep for replies
1073 * Send/recv buffers in req/rep need to be registered
1075 len = buf->rb_max_requests *
1076 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1078 p = kzalloc(len, GFP_KERNEL);
1080 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1085 buf->rb_pool = p; /* for freeing it later */
1087 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1088 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1089 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1090 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1092 rc = ia->ri_ops->ro_init(r_xprt);
1096 for (i = 0; i < buf->rb_max_requests; i++) {
1097 struct rpcrdma_req *req;
1098 struct rpcrdma_rep *rep;
1100 req = rpcrdma_create_req(r_xprt);
1102 dprintk("RPC: %s: request buffer %d alloc"
1103 " failed\n", __func__, i);
1107 buf->rb_send_bufs[i] = req;
1109 rep = rpcrdma_create_rep(r_xprt);
1111 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1116 buf->rb_recv_bufs[i] = rep;
1121 rpcrdma_buffer_destroy(buf);
1126 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1131 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1136 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1141 rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1142 rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1147 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1149 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1152 /* clean up in reverse order from create
1153 * 1. recv mr memory (mr free, then kfree)
1154 * 2. send mr memory (mr free, then kfree)
1157 dprintk("RPC: %s: entering\n", __func__);
1159 for (i = 0; i < buf->rb_max_requests; i++) {
1160 if (buf->rb_recv_bufs)
1161 rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1162 if (buf->rb_send_bufs)
1163 rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1166 ia->ri_ops->ro_destroy(buf);
1168 kfree(buf->rb_pool);
1172 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
1174 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1175 struct rpcrdma_mw *mw = NULL;
1177 spin_lock(&buf->rb_mwlock);
1178 if (!list_empty(&buf->rb_mws)) {
1179 mw = list_first_entry(&buf->rb_mws,
1180 struct rpcrdma_mw, mw_list);
1181 list_del_init(&mw->mw_list);
1183 spin_unlock(&buf->rb_mwlock);
1186 pr_err("RPC: %s: no MWs available\n", __func__);
1191 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
1193 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1195 spin_lock(&buf->rb_mwlock);
1196 list_add_tail(&mw->mw_list, &buf->rb_mws);
1197 spin_unlock(&buf->rb_mwlock);
1201 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1203 buf->rb_send_bufs[--buf->rb_send_index] = req;
1205 if (req->rl_reply) {
1206 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1207 req->rl_reply = NULL;
1212 * Get a set of request/reply buffers.
1214 * Reply buffer (if needed) is attached to send buffer upon return.
1216 * rb_send_index and rb_recv_index MUST always be pointing to the
1217 * *next* available buffer (non-NULL). They are incremented after
1218 * removing buffers, and decremented *before* returning them.
1220 struct rpcrdma_req *
1221 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1223 struct rpcrdma_req *req;
1224 unsigned long flags;
1226 spin_lock_irqsave(&buffers->rb_lock, flags);
1228 if (buffers->rb_send_index == buffers->rb_max_requests) {
1229 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1230 dprintk("RPC: %s: out of request buffers\n", __func__);
1231 return ((struct rpcrdma_req *)NULL);
1234 req = buffers->rb_send_bufs[buffers->rb_send_index];
1235 if (buffers->rb_send_index < buffers->rb_recv_index) {
1236 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1238 buffers->rb_recv_index - buffers->rb_send_index);
1239 req->rl_reply = NULL;
1241 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1242 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1244 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1246 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1251 * Put request/reply buffers back into pool.
1252 * Pre-decrement counter/array index.
1255 rpcrdma_buffer_put(struct rpcrdma_req *req)
1257 struct rpcrdma_buffer *buffers = req->rl_buffer;
1258 unsigned long flags;
1260 spin_lock_irqsave(&buffers->rb_lock, flags);
1261 rpcrdma_buffer_put_sendbuf(req, buffers);
1262 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1266 * Recover reply buffers from pool.
1267 * This happens when recovering from error conditions.
1268 * Post-increment counter/array index.
1271 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1273 struct rpcrdma_buffer *buffers = req->rl_buffer;
1274 unsigned long flags;
1276 spin_lock_irqsave(&buffers->rb_lock, flags);
1277 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1278 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1279 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1281 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1285 * Put reply buffers back into pool when not attached to
1286 * request. This happens in error conditions.
1289 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1291 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1292 unsigned long flags;
1294 spin_lock_irqsave(&buffers->rb_lock, flags);
1295 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1296 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1300 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1304 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1306 dprintk("RPC: map_one: offset %p iova %llx len %zu\n",
1308 (unsigned long long)seg->mr_dma, seg->mr_dmalen);
1312 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1313 struct ib_mr **mrp, struct ib_sge *iov)
1315 struct ib_phys_buf ipb;
1320 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1322 iov->addr = ib_dma_map_single(ia->ri_device,
1323 va, len, DMA_BIDIRECTIONAL);
1324 if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1329 if (ia->ri_have_dma_lkey) {
1331 iov->lkey = ia->ri_dma_lkey;
1333 } else if (ia->ri_bind_mem != NULL) {
1335 iov->lkey = ia->ri_bind_mem->lkey;
1339 ipb.addr = iov->addr;
1340 ipb.size = iov->length;
1341 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1342 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1344 dprintk("RPC: %s: phys convert: 0x%llx "
1345 "registered 0x%llx length %d\n",
1346 __func__, (unsigned long long)ipb.addr,
1347 (unsigned long long)iov->addr, len);
1352 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1355 iov->lkey = mr->lkey;
1363 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1364 struct ib_mr *mr, struct ib_sge *iov)
1368 ib_dma_unmap_single(ia->ri_device,
1369 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1374 rc = ib_dereg_mr(mr);
1376 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1381 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1382 * @ia: controlling rpcrdma_ia
1383 * @size: size of buffer to be allocated, in bytes
1386 * Returns pointer to private header of an area of internally
1387 * registered memory, or an ERR_PTR. The registered buffer follows
1388 * the end of the private header.
1390 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1391 * receiving the payload of RDMA RECV operations. regbufs are not
1392 * used for RDMA READ/WRITE operations, thus are registered only for
1395 struct rpcrdma_regbuf *
1396 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1398 struct rpcrdma_regbuf *rb;
1402 rb = kmalloc(sizeof(*rb) + size, flags);
1407 rb->rg_owner = NULL;
1408 rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1409 &rb->rg_mr, &rb->rg_iov);
1422 * rpcrdma_free_regbuf - deregister and free registered buffer
1423 * @ia: controlling rpcrdma_ia
1424 * @rb: regbuf to be deregistered and freed
1427 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1430 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1436 * Prepost any receive buffer, then post send.
1438 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1441 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1442 struct rpcrdma_ep *ep,
1443 struct rpcrdma_req *req)
1445 struct ib_send_wr send_wr, *send_wr_fail;
1446 struct rpcrdma_rep *rep = req->rl_reply;
1450 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1453 req->rl_reply = NULL;
1456 send_wr.next = NULL;
1457 send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
1458 send_wr.sg_list = req->rl_send_iov;
1459 send_wr.num_sge = req->rl_niovs;
1460 send_wr.opcode = IB_WR_SEND;
1461 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1462 ib_dma_sync_single_for_device(ia->ri_device,
1463 req->rl_send_iov[3].addr,
1464 req->rl_send_iov[3].length,
1466 ib_dma_sync_single_for_device(ia->ri_device,
1467 req->rl_send_iov[1].addr,
1468 req->rl_send_iov[1].length,
1470 ib_dma_sync_single_for_device(ia->ri_device,
1471 req->rl_send_iov[0].addr,
1472 req->rl_send_iov[0].length,
1475 if (DECR_CQCOUNT(ep) > 0)
1476 send_wr.send_flags = 0;
1477 else { /* Provider must take a send completion every now and then */
1479 send_wr.send_flags = IB_SEND_SIGNALED;
1482 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1484 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1491 * (Re)post a receive buffer.
1494 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1495 struct rpcrdma_ep *ep,
1496 struct rpcrdma_rep *rep)
1498 struct ib_recv_wr recv_wr, *recv_wr_fail;
1501 recv_wr.next = NULL;
1502 recv_wr.wr_id = (u64) (unsigned long) rep;
1503 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1504 recv_wr.num_sge = 1;
1506 ib_dma_sync_single_for_cpu(ia->ri_device,
1507 rdmab_addr(rep->rr_rdmabuf),
1508 rdmab_length(rep->rr_rdmabuf),
1511 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1514 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1519 /* How many chunk list items fit within our inline buffers?
1522 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1524 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1525 int bytes, segments;
1527 bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1528 bytes -= RPCRDMA_HDRLEN_MIN;
1529 if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1530 pr_warn("RPC: %s: inline threshold too small\n",
1535 segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1536 dprintk("RPC: %s: max chunk list size = %d segments\n",
1537 __func__, segments);