ceph: reset osd after relevant messages timed out
authorYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 26 Feb 2010 23:32:31 +0000 (15:32 -0800)
committerSage Weil <sage@newdream.net>
Thu, 4 Mar 2010 19:26:35 +0000 (11:26 -0800)
This simplifies the process of timing out messages. We
keep lru of current messages that are in flight. If a
timeout has passed, we reset the osd connection, so that
messages will be retransmitted.  This is a failsafe in case
we hit some sort of problem sending out message to the OSD.
Normally, we'll get notification via an updated osdmap if
there are problems.

If a request is older than the keepalive timeout, send a
keepalive to ensure we detect any breaks in the TCP connection.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/osd_client.c
fs/ceph/osd_client.h
fs/ceph/super.c
fs/ceph/super.h

index c4763bff97b42de488d134dfdcb4a581202661e5..dbe63db9762fc11048bbe5f3bcb37e0aad7f3e2c 100644 (file)
@@ -17,6 +17,8 @@
 #define OSD_OPREPLY_FRONT_LEN  512
 
 const static struct ceph_connection_operations osd_con_ops;
+static int __kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd);
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
@@ -339,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
        osd->o_con.ops = &osd_con_ops;
        osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
 
+       INIT_LIST_HEAD(&osd->o_keepalive_item);
        return osd;
 }
 
@@ -461,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
        return NULL;
 }
 
+static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
+{
+       schedule_delayed_work(&osdc->timeout_work,
+                       osdc->client->mount_args->osd_keepalive_timeout * HZ);
+}
+
+static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
+{
+       cancel_delayed_work(&osdc->timeout_work);
+}
 
 /*
  * Register request, assign tid.  If this is the first request, set up
@@ -472,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc,
        mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
+       INIT_LIST_HEAD(&req->r_req_lru_item);
 
        dout("register_request %p tid %lld\n", req, req->r_tid);
        __insert_request(osdc, req);
        ceph_osdc_get_request(req);
        osdc->num_requests++;
 
-       req->r_timeout_stamp =
-               jiffies + osdc->client->mount_args->osd_timeout*HZ;
-
        if (osdc->num_requests == 1) {
-               osdc->timeout_tid = req->r_tid;
-               dout("  timeout on tid %llu at %lu\n", req->r_tid,
-                    req->r_timeout_stamp);
-               schedule_delayed_work(&osdc->timeout_work,
-                     round_jiffies_relative(req->r_timeout_stamp - jiffies));
+               dout(" first request, scheduling timeout\n");
+               __schedule_osd_timeout(osdc);
        }
        mutex_unlock(&osdc->request_mutex);
 }
@@ -513,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
        ceph_osdc_put_request(req);
 
-       if (req->r_tid == osdc->timeout_tid) {
-               if (osdc->num_requests == 0) {
-                       dout("no requests, canceling timeout\n");
-                       osdc->timeout_tid = 0;
-                       cancel_delayed_work(&osdc->timeout_work);
-               } else {
-                       req = rb_entry(rb_first(&osdc->requests),
-                                      struct ceph_osd_request, r_node);
-                       osdc->timeout_tid = req->r_tid;
-                       dout("rescheduled timeout on tid %llu at %lu\n",
-                            req->r_tid, req->r_timeout_stamp);
-                       schedule_delayed_work(&osdc->timeout_work,
-                             round_jiffies_relative(req->r_timeout_stamp -
-                                                    jiffies));
-               }
+       list_del_init(&req->r_req_lru_item);
+       if (osdc->num_requests == 0) {
+               dout(" no requests, canceling timeout\n");
+               __cancel_osd_timeout(osdc);
        }
 }
 
@@ -540,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req)
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
                req->r_sent = 0;
        }
+       list_del_init(&req->r_req_lru_item);
 }
 
 /*
@@ -635,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc,
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
        reqhead->reassert_version = req->r_reassert_version;
 
-       req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
+       req->r_sent_stamp = jiffies;
+       list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -656,11 +655,14 @@ static void handle_timeout(struct work_struct *work)
 {
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req, *last_req = NULL;
        struct ceph_osd *osd;
        unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
-       unsigned long next_timeout = timeout + jiffies;
+       unsigned long keepalive =
+               osdc->client->mount_args->osd_keepalive_timeout * HZ;
+       unsigned long last_sent = 0;
        struct rb_node *p;
+       struct list_head slow_osds;
 
        dout("timeout\n");
        down_read(&osdc->map_sem);
@@ -683,25 +685,56 @@ static void handle_timeout(struct work_struct *work)
                        continue;
                }
        }
-       for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
-               osd = rb_entry(p, struct ceph_osd, o_node);
-               if (list_empty(&osd->o_requests))
-                       continue;
-               req = list_first_entry(&osd->o_requests,
-                                      struct ceph_osd_request, r_osd_item);
-               if (time_before(jiffies, req->r_timeout_stamp))
-                       continue;
 
-               dout(" tid %llu (at least) timed out on osd%d\n",
+       /*
+        * reset osds that appear to be _really_ unresponsive.  this
+        * is a failsafe measure.. we really shouldn't be getting to
+        * this point if the system is working properly.  the monitors
+        * should mark the osd as failed and we should find out about
+        * it from an updated osd map.
+        */
+       while (!list_empty(&osdc->req_lru)) {
+               req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
+                                r_req_lru_item);
+
+               if (time_before(jiffies, req->r_sent_stamp + timeout))
+                       break;
+
+               BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
+               last_req = req;
+               last_sent = req->r_sent_stamp;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
+                          req->r_tid, osd->o_osd);
+               __kick_requests(osdc, osd);
+       }
+
+       /*
+        * ping osds that are a bit slow.  this ensures that if there
+        * is a break in the TCP connection we will notice, and reopen
+        * a connection with that osd (from the fault callback).
+        */
+       INIT_LIST_HEAD(&slow_osds);
+       list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
+               if (time_before(jiffies, req->r_sent_stamp + keepalive))
+                       break;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               dout(" tid %llu is slow, will send keepalive on osd%d\n",
                     req->r_tid, osd->o_osd);
-               req->r_timeout_stamp = next_timeout;
+               list_move_tail(&osd->o_keepalive_item, &slow_osds);
+       }
+       while (!list_empty(&slow_osds)) {
+               osd = list_entry(slow_osds.next, struct ceph_osd,
+                                o_keepalive_item);
+               list_del_init(&osd->o_keepalive_item);
                ceph_con_keepalive(&osd->o_con);
        }
 
-       if (osdc->timeout_tid)
-               schedule_delayed_work(&osdc->timeout_work,
-                                     round_jiffies_relative(timeout));
-
+       __schedule_osd_timeout(osdc);
        mutex_unlock(&osdc->request_mutex);
 
        up_read(&osdc->map_sem);
@@ -819,18 +852,7 @@ bad:
 }
 
 
-/*
- * Resubmit osd requests whose osd or osd address has changed.  Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static void kick_requests(struct ceph_osd_client *osdc,
+static int __kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_osd *kickosd)
 {
        struct ceph_osd_request *req;
@@ -839,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
        int err;
 
        dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
-       mutex_lock(&osdc->request_mutex);
        if (kickosd) {
                __reset_osd(osdc, kickosd);
        } else {
@@ -900,14 +921,36 @@ kick:
                        req->r_resend = true;
                }
        }
+
+       return needmap;
+}
+
+/*
+ * Resubmit osd requests whose osd or osd address has changed.  Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * Close connections to down osds.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static void kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd)
+{
+       int needmap;
+
+       mutex_lock(&osdc->request_mutex);
+       needmap = __kick_requests(osdc, kickosd);
        mutex_unlock(&osdc->request_mutex);
 
        if (needmap) {
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
-}
 
+}
 /*
  * Process updated osd map.
  *
@@ -1164,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        init_completion(&osdc->map_waiters);
        osdc->last_requested_map = 0;
        mutex_init(&osdc->request_mutex);
-       osdc->timeout_tid = 0;
        osdc->last_tid = 0;
        osdc->osds = RB_ROOT;
        INIT_LIST_HEAD(&osdc->osd_lru);
        osdc->requests = RB_ROOT;
+       INIT_LIST_HEAD(&osdc->req_lru);
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
index f256eba6fe7aa76da74a12cae48ba0b69ca66a61..1b1a3ca43afc3ab0a4fab62bbba065603358e62f 100644 (file)
@@ -36,12 +36,15 @@ struct ceph_osd {
        void *o_authorizer_buf, *o_authorizer_reply_buf;
        size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
        unsigned long lru_ttl;
+       int o_marked_for_keepalive;
+       struct list_head o_keepalive_item;
 };
 
 /* an in-flight request */
 struct ceph_osd_request {
        u64             r_tid;              /* unique for this client */
        struct rb_node  r_node;
+       struct list_head r_req_lru_item;
        struct list_head r_osd_item;
        struct ceph_osd *r_osd;
        struct ceph_pg   r_pgid;
@@ -67,7 +70,7 @@ struct ceph_osd_request {
 
        char              r_oid[40];          /* object name */
        int               r_oid_len;
-       unsigned long     r_timeout_stamp;
+       unsigned long     r_sent_stamp;
        bool              r_resend;           /* msg send failed, needs retry */
 
        struct ceph_file_layout r_file_layout;
@@ -92,6 +95,7 @@ struct ceph_osd_client {
        u64                    timeout_tid;   /* tid of timeout triggering rq */
        u64                    last_tid;      /* tid of last request */
        struct rb_root         requests;      /* pending requests */
+       struct list_head       req_lru;       /* pending requests lru */
        int                    num_requests;
        struct delayed_work    timeout_work;
        struct delayed_work    osds_timeout_work;
index 74953be75f8f135e55fcd39f354847df1c51e167..4290a6e860b0591d509aeab9c79459c9480855cb 100644 (file)
@@ -292,6 +292,7 @@ enum {
        Opt_wsize,
        Opt_rsize,
        Opt_osdtimeout,
+       Opt_osdkeepalivetimeout,
        Opt_mount_timeout,
        Opt_osd_idle_ttl,
        Opt_caps_wanted_delay_min,
@@ -322,6 +323,7 @@ static match_table_t arg_tokens = {
        {Opt_wsize, "wsize=%d"},
        {Opt_rsize, "rsize=%d"},
        {Opt_osdtimeout, "osdtimeout=%d"},
+       {Opt_osdkeepalivetimeout, "osdkeepalive=%d"},
        {Opt_mount_timeout, "mount_timeout=%d"},
        {Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
        {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
@@ -367,7 +369,8 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
        /* start with defaults */
        args->sb_flags = flags;
        args->flags = CEPH_OPT_DEFAULT;
-       args->osd_timeout = 5;    /* seconds */
+       args->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
+       args->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
        args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
        args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;   /* seconds */
        args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
@@ -468,6 +471,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
                case Opt_osdtimeout:
                        args->osd_timeout = intval;
                        break;
+               case Opt_osdkeepalivetimeout:
+                       args->osd_keepalive_timeout = intval;
+                       break;
                case Opt_mount_timeout:
                        args->mount_timeout = intval;
                        break;
index 6a778f2c3f6e8f37dac60e693ed11d66337b03e5..02c0ddcf3eaf783dd0f202bc3fb3ca3c5eeb25b0 100644 (file)
@@ -62,6 +62,7 @@ struct ceph_mount_args {
        int max_readdir;      /* max readdir size */
        int congestion_kb;      /* max readdir size */
        int osd_timeout;
+       int osd_keepalive_timeout;
        char *snapdir_name;   /* default ".snap" */
        char *name;
        char *secret;
@@ -72,6 +73,8 @@ struct ceph_mount_args {
  * defaults
  */
 #define CEPH_MOUNT_TIMEOUT_DEFAULT  60
+#define CEPH_OSD_TIMEOUT_DEFAULT    60  /* seconds */
+#define CEPH_OSD_KEEPALIVE_DEFAULT  5
 #define CEPH_OSD_IDLE_TTL_DEFAULT    60
 #define CEPH_MOUNT_RSIZE_DEFAULT    (512*1024) /* readahead */