From da5a06cef5724737af4315715632f0a07dd5e116 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Thu, 14 Jan 2010 12:18:11 -0800 Subject: [PATCH] RDS: rewrite rds_ib_xmit Now that the header always goes first, it is possible to simplify rds_ib_xmit. Instead of having a path to handle 0-byte dgrams and another path to handle >0, these can both be handled in one path. This lets us eliminate xmit_populate_wr(). Rename sent to bytes_sent, to differentiate better from other variable named "send". Signed-off-by: Andy Grover --- net/rds/ib_send.c | 123 +++++++++++++++++----------------------------- 1 file changed, 45 insertions(+), 78 deletions(-) diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 46026d9091f1..06c1d7e032d2 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -425,38 +425,6 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted) set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); } -static inline void -rds_ib_xmit_populate_wr(struct rds_ib_connection *ic, - struct rds_ib_send_work *send, unsigned int pos, - unsigned long buffer, unsigned int length, - int send_flags) -{ - struct ib_sge *sge; - - WARN_ON(pos != send - ic->i_sends); - - send->s_wr.send_flags = send_flags; - send->s_wr.opcode = IB_WR_SEND; - send->s_wr.num_sge = 1; - send->s_wr.next = NULL; - send->s_queued = jiffies; - send->s_op = NULL; - - sge = &send->s_sge[0]; - sge->addr = ic->i_send_hdrs_dma + (pos * sizeof(struct rds_header)); - sge->length = sizeof(struct rds_header); - sge->lkey = ic->i_mr->lkey; - - if (length != 0) { - send->s_wr.num_sge = 2; - - sge = &send->s_sge[1]; - sge->addr = buffer; - sge->length = length; - sge->lkey = ic->i_mr->lkey; - } -} - /* * This can be called multiple times for a given message. The first time * we see a message we map its scatterlist into the IB device so that @@ -483,11 +451,11 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, u32 pos; u32 i; u32 work_alloc; - u32 credit_alloc; + u32 credit_alloc = 0; u32 posted; u32 adv_credits = 0; int send_flags = 0; - int sent; + int bytes_sent = 0; int ret; int flow_controlled = 0; @@ -515,7 +483,6 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, goto out; } - credit_alloc = work_alloc; if (ic->i_flowctl) { credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT); adv_credits += posted; @@ -591,13 +558,6 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, BUG_ON(adv_credits > 255); } - send = &ic->i_sends[pos]; - first = send; - prev = NULL; - scat = &rm->data.m_sg[sg]; - sent = 0; - i = 0; - /* Sometimes you want to put a fence between an RDMA * READ and the following SEND. * We could either do this all the time @@ -607,31 +567,45 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, if (rm->rdma.m_rdma_op.r_active && rm->rdma.m_rdma_op.r_fence) send_flags = IB_SEND_FENCE; - /* - * We could be copying the header into the unused tail of the page. - * That would need to be changed in the future when those pages might - * be mapped userspace pages or page cache pages. So instead we always - * use a second sge and our long-lived ring of mapped headers. We send - * the header after the data so that the data payload can be aligned on - * the receiver. - */ + /* Each frag gets a header. Msgs may be 0 bytes */ + send = &ic->i_sends[pos]; + first = send; + prev = NULL; + scat = &rm->data.m_sg[sg]; + i = 0; + do { + unsigned int len = 0; - /* handle a 0-len message */ - if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) { - rds_ib_xmit_populate_wr(ic, send, pos, 0, 0, send_flags); - goto add_header; - } + /* Set up the header */ + send->s_wr.send_flags = send_flags; + send->s_wr.opcode = IB_WR_SEND; + send->s_wr.num_sge = 1; + send->s_wr.next = NULL; + send->s_queued = jiffies; + send->s_op = NULL; - /* if there's data reference it with a chain of work reqs */ - for (; i < work_alloc && scat != &rm->data.m_sg[rm->data.m_count]; i++) { - unsigned int len; + send->s_sge[0].addr = ic->i_send_hdrs_dma + + (pos * sizeof(struct rds_header)); + send->s_sge[0].length = sizeof(struct rds_header); - send = &ic->i_sends[pos]; + memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header)); + + /* Set up the data, if present */ + if (i < work_alloc + && scat != &rm->data.m_sg[rm->data.m_count]) { + len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off); + send->s_wr.num_sge = 2; - len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off); - rds_ib_xmit_populate_wr(ic, send, pos, - ib_sg_dma_address(dev, scat) + off, len, - send_flags); + send->s_sge[1].addr = ib_sg_dma_address(dev, scat) + off; + send->s_sge[1].length = len; + + bytes_sent += len; + off += len; + if (off == ib_sg_dma_len(dev, scat)) { + scat++; + off = 0; + } + } /* * We want to delay signaling completions just enough to get @@ -658,18 +632,6 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, rdsdebug("send %p wr %p num_sge %u next %p\n", send, &send->s_wr, send->s_wr.num_sge, send->s_wr.next); - sent += len; - off += len; - if (off == ib_sg_dma_len(dev, scat)) { - scat++; - off = 0; - } - -add_header: - /* Tack on the header after the data. The header SGE should already - * have been set up to point to the right header buffer. */ - memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header)); - if (adv_credits) { struct rds_header *hdr = &ic->i_send_hdrs[pos]; @@ -685,12 +647,16 @@ add_header: prev = send; pos = (pos + 1) % ic->i_send_ring.w_nr; - } + send = &ic->i_sends[pos]; + i++; + + } while (i < work_alloc + && scat != &rm->data.m_sg[rm->data.m_count]); /* Account the RDS header in the number of bytes we sent, but just once. * The caller has no concept of fragmentation. */ if (hdr_off == 0) - sent += sizeof(struct rds_header); + bytes_sent += sizeof(struct rds_header); /* if we finished the message then send completion owns it */ if (scat == &rm->data.m_sg[rm->data.m_count]) { @@ -699,6 +665,7 @@ add_header: ic->i_rm = NULL; } + /* Put back wrs & credits we didn't use */ if (i < work_alloc) { rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); work_alloc = i; @@ -725,7 +692,7 @@ add_header: goto out; } - ret = sent; + ret = bytes_sent; out: BUG_ON(adv_credits); return ret; -- 2.34.1