RDMA/cxgb4: DB Drop Recovery for RDMA and LLD queues
authorVipul Pandya <vipul@chelsio.com>
Fri, 18 May 2012 09:59:30 +0000 (15:29 +0530)
committerRoland Dreier <roland@purestorage.com>
Fri, 18 May 2012 20:22:33 +0000 (13:22 -0700)
Add module option db_fc_threshold which is the count of active QPs
that trigger automatic db flow control mode.  Automatically transition
to/from flow control mode when the active qp count crosses
db_fc_theshold.

Add more db debugfs stats

On DB DROP event from the LLD, recover all the iwarp queues.

Signed-off-by: Vipul Pandya <vipul@chelsio.com>
Signed-off-by: Steve Wise <swise@opengridcomputing.com>
Signed-off-by: Roland Dreier <roland@purestorage.com>
drivers/infiniband/hw/cxgb4/device.c
drivers/infiniband/hw/cxgb4/iw_cxgb4.h
drivers/infiniband/hw/cxgb4/qp.c
drivers/infiniband/hw/cxgb4/t4.h

index 9062ed90ea938ff7f97f075490534c939d30d973..bdb398f54a64e31fe498d33888b000c7239c72bf 100644 (file)
@@ -246,6 +246,8 @@ static const struct file_operations stag_debugfs_fops = {
        .llseek  = default_llseek,
 };
 
+static char *db_state_str[] = {"NORMAL", "FLOW_CONTROL", "RECOVERY"};
+
 static int stats_show(struct seq_file *seq, void *v)
 {
        struct c4iw_dev *dev = seq->private;
@@ -272,6 +274,9 @@ static int stats_show(struct seq_file *seq, void *v)
        seq_printf(seq, "  DB FULL: %10llu\n", dev->rdev.stats.db_full);
        seq_printf(seq, " DB EMPTY: %10llu\n", dev->rdev.stats.db_empty);
        seq_printf(seq, "  DB DROP: %10llu\n", dev->rdev.stats.db_drop);
+       seq_printf(seq, " DB State: %s Transitions %llu\n",
+                  db_state_str[dev->db_state],
+                  dev->rdev.stats.db_state_transitions);
        return 0;
 }
 
@@ -295,6 +300,7 @@ static ssize_t stats_clear(struct file *file, const char __user *buf,
        dev->rdev.stats.db_full = 0;
        dev->rdev.stats.db_empty = 0;
        dev->rdev.stats.db_drop = 0;
+       dev->rdev.stats.db_state_transitions = 0;
        mutex_unlock(&dev->rdev.stats.lock);
        return count;
 }
@@ -677,8 +683,11 @@ static int disable_qp_db(int id, void *p, void *data)
 static void stop_queues(struct uld_ctx *ctx)
 {
        spin_lock_irq(&ctx->dev->lock);
-       ctx->dev->db_state = FLOW_CONTROL;
-       idr_for_each(&ctx->dev->qpidr, disable_qp_db, NULL);
+       if (ctx->dev->db_state == NORMAL) {
+               ctx->dev->rdev.stats.db_state_transitions++;
+               ctx->dev->db_state = FLOW_CONTROL;
+               idr_for_each(&ctx->dev->qpidr, disable_qp_db, NULL);
+       }
        spin_unlock_irq(&ctx->dev->lock);
 }
 
@@ -693,9 +702,165 @@ static int enable_qp_db(int id, void *p, void *data)
 static void resume_queues(struct uld_ctx *ctx)
 {
        spin_lock_irq(&ctx->dev->lock);
-       ctx->dev->db_state = NORMAL;
-       idr_for_each(&ctx->dev->qpidr, enable_qp_db, NULL);
+       if (ctx->dev->qpcnt <= db_fc_threshold &&
+           ctx->dev->db_state == FLOW_CONTROL) {
+               ctx->dev->db_state = NORMAL;
+               ctx->dev->rdev.stats.db_state_transitions++;
+               idr_for_each(&ctx->dev->qpidr, enable_qp_db, NULL);
+       }
+       spin_unlock_irq(&ctx->dev->lock);
+}
+
+struct qp_list {
+       unsigned idx;
+       struct c4iw_qp **qps;
+};
+
+static int add_and_ref_qp(int id, void *p, void *data)
+{
+       struct qp_list *qp_listp = data;
+       struct c4iw_qp *qp = p;
+
+       c4iw_qp_add_ref(&qp->ibqp);
+       qp_listp->qps[qp_listp->idx++] = qp;
+       return 0;
+}
+
+static int count_qps(int id, void *p, void *data)
+{
+       unsigned *countp = data;
+       (*countp)++;
+       return 0;
+}
+
+static void deref_qps(struct qp_list qp_list)
+{
+       int idx;
+
+       for (idx = 0; idx < qp_list.idx; idx++)
+               c4iw_qp_rem_ref(&qp_list.qps[idx]->ibqp);
+}
+
+static void recover_lost_dbs(struct uld_ctx *ctx, struct qp_list *qp_list)
+{
+       int idx;
+       int ret;
+
+       for (idx = 0; idx < qp_list->idx; idx++) {
+               struct c4iw_qp *qp = qp_list->qps[idx];
+
+               ret = cxgb4_sync_txq_pidx(qp->rhp->rdev.lldi.ports[0],
+                                         qp->wq.sq.qid,
+                                         t4_sq_host_wq_pidx(&qp->wq),
+                                         t4_sq_wq_size(&qp->wq));
+               if (ret) {
+                       printk(KERN_ERR MOD "%s: Fatal error - "
+                              "DB overflow recovery failed - "
+                              "error syncing SQ qid %u\n",
+                              pci_name(ctx->lldi.pdev), qp->wq.sq.qid);
+                       return;
+               }
+
+               ret = cxgb4_sync_txq_pidx(qp->rhp->rdev.lldi.ports[0],
+                                         qp->wq.rq.qid,
+                                         t4_rq_host_wq_pidx(&qp->wq),
+                                         t4_rq_wq_size(&qp->wq));
+
+               if (ret) {
+                       printk(KERN_ERR MOD "%s: Fatal error - "
+                              "DB overflow recovery failed - "
+                              "error syncing RQ qid %u\n",
+                              pci_name(ctx->lldi.pdev), qp->wq.rq.qid);
+                       return;
+               }
+
+               /* Wait for the dbfifo to drain */
+               while (cxgb4_dbfifo_count(qp->rhp->rdev.lldi.ports[0], 1) > 0) {
+                       set_current_state(TASK_UNINTERRUPTIBLE);
+                       schedule_timeout(usecs_to_jiffies(10));
+               }
+       }
+}
+
+static void recover_queues(struct uld_ctx *ctx)
+{
+       int count = 0;
+       struct qp_list qp_list;
+       int ret;
+
+       /* lock out kernel db ringers */
+       mutex_lock(&ctx->dev->db_mutex);
+
+       /* put all queues in to recovery mode */
+       spin_lock_irq(&ctx->dev->lock);
+       ctx->dev->db_state = RECOVERY;
+       ctx->dev->rdev.stats.db_state_transitions++;
+       idr_for_each(&ctx->dev->qpidr, disable_qp_db, NULL);
+       spin_unlock_irq(&ctx->dev->lock);
+
+       /* slow everybody down */
+       set_current_state(TASK_UNINTERRUPTIBLE);
+       schedule_timeout(usecs_to_jiffies(1000));
+
+       /* Wait for the dbfifo to completely drain. */
+       while (cxgb4_dbfifo_count(ctx->dev->rdev.lldi.ports[0], 1) > 0) {
+               set_current_state(TASK_UNINTERRUPTIBLE);
+               schedule_timeout(usecs_to_jiffies(10));
+       }
+
+       /* flush the SGE contexts */
+       ret = cxgb4_flush_eq_cache(ctx->dev->rdev.lldi.ports[0]);
+       if (ret) {
+               printk(KERN_ERR MOD "%s: Fatal error - DB overflow recovery failed\n",
+                      pci_name(ctx->lldi.pdev));
+               goto out;
+       }
+
+       /* Count active queues so we can build a list of queues to recover */
+       spin_lock_irq(&ctx->dev->lock);
+       idr_for_each(&ctx->dev->qpidr, count_qps, &count);
+
+       qp_list.qps = kzalloc(count * sizeof *qp_list.qps, GFP_ATOMIC);
+       if (!qp_list.qps) {
+               printk(KERN_ERR MOD "%s: Fatal error - DB overflow recovery failed\n",
+                      pci_name(ctx->lldi.pdev));
+               spin_unlock_irq(&ctx->dev->lock);
+               goto out;
+       }
+       qp_list.idx = 0;
+
+       /* add and ref each qp so it doesn't get freed */
+       idr_for_each(&ctx->dev->qpidr, add_and_ref_qp, &qp_list);
+
        spin_unlock_irq(&ctx->dev->lock);
+
+       /* now traverse the list in a safe context to recover the db state*/
+       recover_lost_dbs(ctx, &qp_list);
+
+       /* we're almost done!  deref the qps and clean up */
+       deref_qps(qp_list);
+       kfree(qp_list.qps);
+
+       /* Wait for the dbfifo to completely drain again */
+       while (cxgb4_dbfifo_count(ctx->dev->rdev.lldi.ports[0], 1) > 0) {
+               set_current_state(TASK_UNINTERRUPTIBLE);
+               schedule_timeout(usecs_to_jiffies(10));
+       }
+
+       /* resume the queues */
+       spin_lock_irq(&ctx->dev->lock);
+       if (ctx->dev->qpcnt > db_fc_threshold)
+               ctx->dev->db_state = FLOW_CONTROL;
+       else {
+               ctx->dev->db_state = NORMAL;
+               idr_for_each(&ctx->dev->qpidr, enable_qp_db, NULL);
+       }
+       ctx->dev->rdev.stats.db_state_transitions++;
+       spin_unlock_irq(&ctx->dev->lock);
+
+out:
+       /* start up kernel db ringers again */
+       mutex_unlock(&ctx->dev->db_mutex);
 }
 
 static int c4iw_uld_control(void *handle, enum cxgb4_control control, ...)
@@ -716,8 +881,7 @@ static int c4iw_uld_control(void *handle, enum cxgb4_control control, ...)
                mutex_unlock(&ctx->dev->rdev.stats.lock);
                break;
        case CXGB4_CONTROL_DB_DROP:
-               printk(KERN_WARNING MOD "%s: Fatal DB DROP\n",
-                      pci_name(ctx->lldi.pdev));
+               recover_queues(ctx);
                mutex_lock(&ctx->dev->rdev.stats.lock);
                ctx->dev->rdev.stats.db_drop++;
                mutex_unlock(&ctx->dev->rdev.stats.lock);
index e8b88a02cc7737657490ac753d92b9738c146402..6818659f2617cb2d322bf99491bbda6a7523f01f 100644 (file)
@@ -120,6 +120,7 @@ struct c4iw_stats {
        u64  db_full;
        u64  db_empty;
        u64  db_drop;
+       u64  db_state_transitions;
 };
 
 struct c4iw_rdev {
@@ -212,6 +213,7 @@ struct c4iw_dev {
        struct mutex db_mutex;
        struct dentry *debugfs_root;
        enum db_state db_state;
+       int qpcnt;
 };
 
 static inline struct c4iw_dev *to_c4iw_dev(struct ib_device *ibdev)
@@ -271,11 +273,25 @@ static inline int insert_handle_nolock(struct c4iw_dev *rhp, struct idr *idr,
        return _insert_handle(rhp, idr, handle, id, 0);
 }
 
-static inline void remove_handle(struct c4iw_dev *rhp, struct idr *idr, u32 id)
+static inline void _remove_handle(struct c4iw_dev *rhp, struct idr *idr,
+                                  u32 id, int lock)
 {
-       spin_lock_irq(&rhp->lock);
+       if (lock)
+               spin_lock_irq(&rhp->lock);
        idr_remove(idr, id);
-       spin_unlock_irq(&rhp->lock);
+       if (lock)
+               spin_unlock_irq(&rhp->lock);
+}
+
+static inline void remove_handle(struct c4iw_dev *rhp, struct idr *idr, u32 id)
+{
+       _remove_handle(rhp, idr, id, 1);
+}
+
+static inline void remove_handle_nolock(struct c4iw_dev *rhp,
+                                        struct idr *idr, u32 id)
+{
+       _remove_handle(rhp, idr, id, 0);
 }
 
 struct c4iw_pd {
@@ -843,5 +859,7 @@ void c4iw_ev_dispatch(struct c4iw_dev *dev, struct t4_cqe *err_cqe);
 extern struct cxgb4_client t4c_client;
 extern c4iw_handler_func c4iw_handlers[NUM_CPL_CMDS];
 extern int c4iw_max_read_depth;
+extern int db_fc_threshold;
+
 
 #endif
index beec66758aec252052b851f898ab4dd10c8991da..ba1343ee1414fb56fab030f27fda1e627e0e74b3 100644 (file)
@@ -42,6 +42,11 @@ static int ocqp_support = 1;
 module_param(ocqp_support, int, 0644);
 MODULE_PARM_DESC(ocqp_support, "Support on-chip SQs (default=1)");
 
+int db_fc_threshold = 2000;
+module_param(db_fc_threshold, int, 0644);
+MODULE_PARM_DESC(db_fc_threshold, "QP count/threshold that triggers automatic "
+                "db flow control mode (default = 2000)");
+
 static void set_state(struct c4iw_qp *qhp, enum c4iw_qp_state state)
 {
        unsigned long flag;
@@ -1143,13 +1148,19 @@ static int ring_kernel_db(struct c4iw_qp *qhp, u32 qid, u16 inc)
 
        mutex_lock(&qhp->rhp->db_mutex);
        do {
-               if (cxgb4_dbfifo_count(qhp->rhp->rdev.lldi.ports[0], 1) < 768) {
+
+               /*
+                * The interrupt threshold is dbfifo_int_thresh << 6. So
+                * make sure we don't cross that and generate an interrupt.
+                */
+               if (cxgb4_dbfifo_count(qhp->rhp->rdev.lldi.ports[0], 1) <
+                   (qhp->rhp->rdev.lldi.dbfifo_int_thresh << 5)) {
                        writel(V_QID(qid) | V_PIDX(inc), qhp->wq.db);
                        break;
                }
                set_current_state(TASK_UNINTERRUPTIBLE);
                schedule_timeout(usecs_to_jiffies(delay));
-               delay = min(delay << 1, 200000);
+               delay = min(delay << 1, 2000);
        } while (1);
        mutex_unlock(&qhp->rhp->db_mutex);
        return 0;
@@ -1388,6 +1399,14 @@ out:
        return ret;
 }
 
+static int enable_qp_db(int id, void *p, void *data)
+{
+       struct c4iw_qp *qp = p;
+
+       t4_enable_wq_db(&qp->wq);
+       return 0;
+}
+
 int c4iw_destroy_qp(struct ib_qp *ib_qp)
 {
        struct c4iw_dev *rhp;
@@ -1405,7 +1424,16 @@ int c4iw_destroy_qp(struct ib_qp *ib_qp)
                c4iw_modify_qp(rhp, qhp, C4IW_QP_ATTR_NEXT_STATE, &attrs, 0);
        wait_event(qhp->wait, !qhp->ep);
 
-       remove_handle(rhp, &rhp->qpidr, qhp->wq.sq.qid);
+       spin_lock_irq(&rhp->lock);
+       remove_handle_nolock(rhp, &rhp->qpidr, qhp->wq.sq.qid);
+       rhp->qpcnt--;
+       BUG_ON(rhp->qpcnt < 0);
+       if (rhp->qpcnt <= db_fc_threshold && rhp->db_state == FLOW_CONTROL) {
+               rhp->rdev.stats.db_state_transitions++;
+               rhp->db_state = NORMAL;
+               idr_for_each(&rhp->qpidr, enable_qp_db, NULL);
+       }
+       spin_unlock_irq(&rhp->lock);
        atomic_dec(&qhp->refcnt);
        wait_event(qhp->wait, !atomic_read(&qhp->refcnt));
 
@@ -1419,6 +1447,14 @@ int c4iw_destroy_qp(struct ib_qp *ib_qp)
        return 0;
 }
 
+static int disable_qp_db(int id, void *p, void *data)
+{
+       struct c4iw_qp *qp = p;
+
+       t4_disable_wq_db(&qp->wq);
+       return 0;
+}
+
 struct ib_qp *c4iw_create_qp(struct ib_pd *pd, struct ib_qp_init_attr *attrs,
                             struct ib_udata *udata)
 {
@@ -1508,6 +1544,11 @@ struct ib_qp *c4iw_create_qp(struct ib_pd *pd, struct ib_qp_init_attr *attrs,
        spin_lock_irq(&rhp->lock);
        if (rhp->db_state != NORMAL)
                t4_disable_wq_db(&qhp->wq);
+       if (++rhp->qpcnt > db_fc_threshold && rhp->db_state == NORMAL) {
+               rhp->rdev.stats.db_state_transitions++;
+               rhp->db_state = FLOW_CONTROL;
+               idr_for_each(&rhp->qpidr, disable_qp_db, NULL);
+       }
        ret = insert_handle_nolock(rhp, &rhp->qpidr, qhp, qhp->wq.sq.qid);
        spin_unlock_irq(&rhp->lock);
        if (ret)
index c0221eec8817b67884a831583b4c8035eba10135..16f26ab293022f797a4182ff179961619bad3382 100644 (file)
@@ -62,6 +62,10 @@ struct t4_status_page {
        __be16 pidx;
        u8 qp_err;      /* flit 1 - sw owns */
        u8 db_off;
+       u8 pad;
+       u16 host_wq_pidx;
+       u16 host_cidx;
+       u16 host_pidx;
 };
 
 #define T4_EQ_ENTRY_SIZE 64
@@ -375,6 +379,16 @@ static inline void t4_rq_consume(struct t4_wq *wq)
                wq->rq.cidx = 0;
 }
 
+static inline u16 t4_rq_host_wq_pidx(struct t4_wq *wq)
+{
+       return wq->rq.queue[wq->rq.size].status.host_wq_pidx;
+}
+
+static inline u16 t4_rq_wq_size(struct t4_wq *wq)
+{
+               return wq->rq.size * T4_RQ_NUM_SLOTS;
+}
+
 static inline int t4_sq_onchip(struct t4_sq *sq)
 {
        return sq->flags & T4_SQ_ONCHIP;
@@ -412,6 +426,16 @@ static inline void t4_sq_consume(struct t4_wq *wq)
                wq->sq.cidx = 0;
 }
 
+static inline u16 t4_sq_host_wq_pidx(struct t4_wq *wq)
+{
+       return wq->sq.queue[wq->sq.size].status.host_wq_pidx;
+}
+
+static inline u16 t4_sq_wq_size(struct t4_wq *wq)
+{
+               return wq->sq.size * T4_SQ_NUM_SLOTS;
+}
+
 static inline void t4_ring_sq_db(struct t4_wq *wq, u16 inc)
 {
        wmb();