RDS: Implement silent atomics
authorAndy Grover <andy.grover@oracle.com>
Wed, 20 Jan 2010 05:25:26 +0000 (21:25 -0800)
committerAndy Grover <andy.grover@oracle.com>
Thu, 9 Sep 2010 01:11:55 +0000 (18:11 -0700)
Signed-off-by: Andy Grover <andy.grover@oracle.com>
net/rds/ib.h
net/rds/ib_send.c
net/rds/message.c
net/rds/rds.h
net/rds/send.c

index 148818174a0453f72b2eb23c9d73f89ea65b9758..96769b86a5368f4e75c985b0be4344f95418e48a 100644 (file)
@@ -336,7 +336,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
 void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
 int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
                             u32 *adv_credits, int need_posted, int max_posted);
-int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);
+int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm);
 
 /* ib_stats.c */
 DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
index d839b403d46bfa1675d7a3627f4998ba4de6324d..e6745d827c3a995ae9b174e74e3c61c681e322c7 100644 (file)
@@ -225,15 +225,12 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
                        /* In the error case, wc.opcode sometimes contains garbage */
                        switch (send->s_wr.opcode) {
                        case IB_WR_SEND:
-                               if (send->s_rm)
-                                       rds_ib_send_unmap_rm(ic, send, wc.status);
-                               break;
                        case IB_WR_RDMA_WRITE:
                        case IB_WR_RDMA_READ:
                        case IB_WR_ATOMIC_FETCH_AND_ADD:
                        case IB_WR_ATOMIC_CMP_AND_SWP:
-                               /* Nothing to be done - the SG list will be unmapped
-                                * when the SEND completes. */
+                               if (send->s_rm)
+                                       rds_ib_send_unmap_rm(ic, send, wc.status);
                                break;
                        default:
                                if (printk_ratelimit())
@@ -425,6 +422,21 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
+static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+                                             struct rds_ib_send_work *send,
+                                             bool notify)
+{
+       /*
+        * We want to delay signaling completions just enough to get
+        * the batching benefits but not so much that we create dead time
+        * on the wire.
+        */
+       if (ic->i_unsignaled_wrs-- == 0 || notify) {
+               ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
+               send->s_wr.send_flags |= IB_SEND_SIGNALED;
+       }
+}
+
 /*
  * This can be called multiple times for a given message.  The first time
  * we see a message we map its scatterlist into the IB device so that
@@ -517,7 +529,6 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                        rm->data.m_count = 0;
                }
 
-               ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
                rds_message_addref(rm);
                ic->i_rm = rm;
 
@@ -608,15 +619,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                        }
                }
 
-               /*
-                * We want to delay signaling completions just enough to get
-                * the batching benefits but not so much that we create dead time
-                * on the wire.
-                */
-               if (ic->i_unsignaled_wrs-- == 0) {
-                       ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
-                       send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
-               }
+               rds_ib_set_wr_signal_state(ic, send, 0);
 
                /*
                 * Always signal the last one if we're stopping due to flow control.
@@ -656,7 +659,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        /* if we finished the message then send completion owns it */
        if (scat == &rm->data.m_sg[rm->data.m_count]) {
                prev->s_rm = ic->i_rm;
-               prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
+               prev->s_wr.send_flags |= IB_SEND_SOLICITED;
                ic->i_rm = NULL;
        }
 
@@ -698,9 +701,10 @@ out:
  * A simplified version of the rdma case, we always map 1 SG, and
  * only 8 bytes, for the return value from the atomic operation.
  */
-int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
+int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
 {
        struct rds_ib_connection *ic = conn->c_transport_data;
+       struct rm_atomic_op *op = &rm->atomic;
        struct rds_ib_send_work *send = NULL;
        struct ib_send_wr *failed_wr;
        struct rds_ib_device *rds_ibdev;
@@ -731,12 +735,20 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
                send->s_wr.wr.atomic.compare_add = op->op_swap_add;
                send->s_wr.wr.atomic.swap = 0;
        }
-       send->s_wr.send_flags = IB_SEND_SIGNALED;
+       rds_ib_set_wr_signal_state(ic, send, op->op_notify);
        send->s_wr.num_sge = 1;
        send->s_wr.next = NULL;
        send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
        send->s_wr.wr.atomic.rkey = op->op_rkey;
 
+       /*
+        * If there is no data or rdma ops in the message, then
+        * we must fill in s_rm ourselves, so we properly clean up
+        * on completion.
+        */
+       if (!rm->rdma.m_rdma_op.r_active && !rm->data.op_active)
+               send->s_rm = rm;
+
        /* map 8 byte retval buffer to the device */
        ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
        rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
@@ -836,14 +848,8 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
        for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) {
                send->s_wr.send_flags = 0;
                send->s_queued = jiffies;
-               /*
-                * We want to delay signaling completions just enough to get
-                * the batching benefits but not so much that we create dead time on the wire.
-                */
-               if (ic->i_unsignaled_wrs-- == 0) {
-                       ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
-                       send->s_wr.send_flags = IB_SEND_SIGNALED;
-               }
+
+               rds_ib_set_wr_signal_state(ic, send, op->r_notify);
 
                send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
                send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -884,10 +890,6 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
                        send = ic->i_sends;
        }
 
-       /* if we finished the message then send completion owns it */
-       if (scat == &op->r_sg[op->r_count])
-               prev->s_wr.send_flags = IB_SEND_SIGNALED;
-
        if (i < work_alloc) {
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
                work_alloc = i;
index 3ea05c864cd4dbf694c7e952cb92a43cdeb85ffa..a27e493a63a227e9d2c518f8bbcc7b33ffdef0d9 100644 (file)
@@ -325,6 +325,8 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iovec *first_iov,
                        sg++;
        }
 
+       rm->data.op_active = 1;
+
 out:
        return ret;
 }
index 0c610a102c2059d1edee63d0ba5d9178571bb8d3..bf2349da4db7b790dad94a2c94663629004a2e85 100644 (file)
@@ -341,6 +341,7 @@ struct rds_message {
                        struct rds_mr           *m_rdma_mr;
                } rdma;
                struct rm_data_op {
+                       unsigned int            op_active:1;
                        unsigned int            m_nents;
                        unsigned int            m_count;
                        struct scatterlist      *m_sg;
@@ -418,7 +419,7 @@ struct rds_transport {
        int (*xmit_cong_map)(struct rds_connection *conn,
                             struct rds_cong_map *map, unsigned long offset);
        int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op);
-       int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op);
+       int (*xmit_atomic)(struct rds_connection *conn, struct rds_message *rm);
        int (*recv)(struct rds_connection *conn);
        int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
                                size_t size);
index 5bc35d2f40ea1950956a8f82328f3cd5dc0b7356..42fb934293be108d0e151256c5afa0e480030d45 100644 (file)
@@ -266,7 +266,7 @@ int rds_send_xmit(struct rds_connection *conn)
 
 
                if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
-                       ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
+                       ret = conn->c_trans->xmit_atomic(conn, rm);
                        if (ret)
                                break;
                        conn->c_xmit_atomic_sent = 1;
@@ -285,13 +285,18 @@ int rds_send_xmit(struct rds_connection *conn)
                        if (ret)
                                break;
                        conn->c_xmit_rdma_sent = 1;
+
+                       /* rdmas need data sent, even if just the header */
+                       rm->data.op_active = 1;
+
                        /* The transport owns the mapped memory for now.
                         * You can't unmap it while it's on the send queue */
                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
                }
 
-               if (conn->c_xmit_hdr_off < sizeof(struct rds_header) ||
-                   conn->c_xmit_sg < rm->data.m_nents) {
+               if (rm->data.op_active
+                   && (conn->c_xmit_hdr_off < sizeof(struct rds_header) ||
+                       conn->c_xmit_sg < rm->data.m_nents)) {
                        ret = conn->c_trans->xmit(conn, rm,
                                                  conn->c_xmit_hdr_off,
                                                  conn->c_xmit_sg,