sctp: fix possible seqlock seadlock in sctp_packet_transmit()
[firefly-linux-kernel-4.4.55.git] / net / ceph / osd_client.c
index d5953b87918c072daaa1427187246f9f9cfcad3e..3663a305daf71f255161bde0d3d44ab9cef4d96c 100644 (file)
@@ -733,12 +733,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
        object_size = le32_to_cpu(layout->fl_object_size);
        object_base = off - objoff;
-       if (truncate_size <= object_base) {
-               truncate_size = 0;
-       } else {
-               truncate_size -= object_base;
-               if (truncate_size > object_size)
-                       truncate_size = object_size;
+       if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
+               if (truncate_size <= object_base) {
+                       truncate_size = 0;
+               } else {
+                       truncate_size -= object_base;
+                       if (truncate_size > object_size)
+                               truncate_size = object_size;
+               }
        }
 
        osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
@@ -1174,6 +1176,7 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
                                    struct ceph_osd_request *req)
 {
        dout("__register_linger_request %p\n", req);
+       ceph_osdc_get_request(req);
        list_add_tail(&req->r_linger_item, &osdc->req_linger);
        if (req->r_osd)
                list_add_tail(&req->r_linger_osd,
@@ -1196,6 +1199,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
                if (list_empty(&req->r_osd_item))
                        req->r_osd = NULL;
        }
+       ceph_osdc_put_request(req);
 }
 
 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
@@ -1203,9 +1207,8 @@ void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
 {
        mutex_lock(&osdc->request_mutex);
        if (req->r_linger) {
-               __unregister_linger_request(osdc, req);
                req->r_linger = 0;
-               ceph_osdc_put_request(req);
+               __unregister_linger_request(osdc, req);
        }
        mutex_unlock(&osdc->request_mutex);
 }
@@ -1217,15 +1220,26 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
        if (!req->r_linger) {
                dout("set_request_linger %p\n", req);
                req->r_linger = 1;
-               /*
-                * caller is now responsible for calling
-                * unregister_linger_request
-                */
-               ceph_osdc_get_request(req);
        }
 }
 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
 
+/*
+ * Returns whether a request should be blocked from being sent
+ * based on the current osdmap and osd_client settings.
+ *
+ * Caller should hold map_sem for read.
+ */
+static bool __req_should_be_paused(struct ceph_osd_client *osdc,
+                                  struct ceph_osd_request *req)
+{
+       bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
+       bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
+               ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+       return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
+               (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
+}
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
@@ -1243,6 +1257,7 @@ static int __map_request(struct ceph_osd_client *osdc,
        int acting[CEPH_PG_MAX_SIZE];
        int o = -1, num = 0;
        int err;
+       bool was_paused;
 
        dout("map_request %p tid %lld\n", req, req->r_tid);
        err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap,
@@ -1259,12 +1274,18 @@ static int __map_request(struct ceph_osd_client *osdc,
                num = err;
        }
 
+       was_paused = req->r_paused;
+       req->r_paused = __req_should_be_paused(osdc, req);
+       if (was_paused && !req->r_paused)
+               force_resend = 1;
+
        if ((!force_resend &&
             req->r_osd && req->r_osd->o_osd == o &&
             req->r_sent >= req->r_osd->o_incarnation &&
             req->r_num_pg_osds == num &&
             memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
-           (req->r_osd == NULL && o == -1))
+           (req->r_osd == NULL && o == -1) ||
+           req->r_paused)
                return 0;  /* no change */
 
        dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
@@ -1339,10 +1360,6 @@ static void __send_request(struct ceph_osd_client *osdc,
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
 
-       /* Mark the request unsafe if this is the first timet's being sent. */
-
-       if (!req->r_sent && req->r_unsafe_callback)
-               req->r_unsafe_callback(req, true);
        req->r_sent = req->r_osd->o_incarnation;
 
        ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -1433,8 +1450,6 @@ static void handle_osds_timeout(struct work_struct *work)
 
 static void complete_request(struct ceph_osd_request *req)
 {
-       if (req->r_unsafe_callback)
-               req->r_unsafe_callback(req, false);
        complete_all(&req->r_safe_completion);  /* fsync waiter */
 }
 
@@ -1496,14 +1511,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
             req, result);
 
-       ceph_decode_need(&p, end, 4, bad);
+       ceph_decode_need(&p, end, 4, bad_put);
        numops = ceph_decode_32(&p);
        if (numops > CEPH_OSD_MAX_OP)
                goto bad_put;
        if (numops != req->r_num_ops)
                goto bad_put;
        payload_len = 0;
-       ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
+       ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
        for (i = 0; i < numops; i++) {
                struct ceph_osd_op *op = p;
                int len;
@@ -1521,11 +1536,13 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                goto bad_put;
        }
 
-       ceph_decode_need(&p, end, 4 + numops * 4, bad);
+       ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
        retry_attempt = ceph_decode_32(&p);
        for (i = 0; i < numops; i++)
                req->r_reply_op_result[i] = ceph_decode_32(&p);
 
+       already_completed = req->r_got_reply;
+
        if (!req->r_got_reply) {
 
                req->r_result = result;
@@ -1556,19 +1573,23 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
            ((flags & CEPH_OSD_FLAG_WRITE) == 0))
                __unregister_request(osdc, req);
 
-       already_completed = req->r_completed;
-       req->r_completed = 1;
        mutex_unlock(&osdc->request_mutex);
-       if (already_completed)
-               goto done;
 
-       if (req->r_callback)
-               req->r_callback(req, msg);
-       else
-               complete_all(&req->r_completion);
+       if (!already_completed) {
+               if (req->r_unsafe_callback &&
+                   result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK))
+                       req->r_unsafe_callback(req, true);
+               if (req->r_callback)
+                       req->r_callback(req, msg);
+               else
+                       complete_all(&req->r_completion);
+       }
 
-       if (flags & CEPH_OSD_FLAG_ONDISK)
+       if (flags & CEPH_OSD_FLAG_ONDISK) {
+               if (req->r_unsafe_callback && already_completed)
+                       req->r_unsafe_callback(req, false);
                complete_request(req);
+       }
 
 done:
        dout("req=%p req->r_linger=%d\n", req, req->r_linger);
@@ -1608,14 +1629,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
  *
  * Caller should hold map_sem for read.
  */
-static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
+static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
+                         bool force_resend_writes)
 {
        struct ceph_osd_request *req, *nreq;
        struct rb_node *p;
        int needmap = 0;
        int err;
+       bool force_resend_req;
 
-       dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
+       dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
+               force_resend_writes ? " (force resend writes)" : "");
        mutex_lock(&osdc->request_mutex);
        for (p = rb_first(&osdc->requests); p; ) {
                req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -1633,12 +1657,17 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                        dout("%p tid %llu restart on osd%d\n",
                             req, req->r_tid,
                             req->r_osd ? req->r_osd->o_osd : -1);
+                       ceph_osdc_get_request(req);
                        __unregister_request(osdc, req);
                        __register_linger_request(osdc, req);
+                       ceph_osdc_put_request(req);
                        continue;
                }
 
-               err = __map_request(osdc, req, force_resend);
+               force_resend_req = force_resend ||
+                       (force_resend_writes &&
+                               req->r_flags & CEPH_OSD_FLAG_WRITE);
+               err = __map_request(osdc, req, force_resend_req);
                if (err < 0)
                        continue;  /* error */
                if (req->r_osd == NULL) {
@@ -1658,7 +1687,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                                 r_linger_item) {
                dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
 
-               err = __map_request(osdc, req, force_resend);
+               err = __map_request(osdc, req,
+                                   force_resend || force_resend_writes);
                dout("__map_request returned %d\n", err);
                if (err == 0)
                        continue;  /* no change and no osd was specified */
@@ -1675,13 +1705,13 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                __register_request(osdc, req);
                __unregister_linger_request(osdc, req);
        }
+       reset_changed_osds(osdc);
        mutex_unlock(&osdc->request_mutex);
 
        if (needmap) {
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
-       reset_changed_osds(osdc);
 }
 
 
@@ -1700,6 +1730,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        struct ceph_osdmap *newmap = NULL, *oldmap;
        int err;
        struct ceph_fsid fsid;
+       bool was_full;
 
        dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
        p = msg->front.iov_base;
@@ -1713,6 +1744,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 
        down_write(&osdc->map_sem);
 
+       was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+
        /* incremental maps */
        ceph_decode_32_safe(&p, end, nr_maps, bad);
        dout(" %d inc maps\n", nr_maps);
@@ -1737,7 +1770,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                ceph_osdmap_destroy(osdc->osdmap);
                                osdc->osdmap = newmap;
                        }
-                       kick_requests(osdc, 0);
+                       was_full = was_full ||
+                               ceph_osdmap_flag(osdc->osdmap,
+                                                CEPH_OSDMAP_FULL);
+                       kick_requests(osdc, 0, was_full);
                } else {
                        dout("ignoring incremental map %u len %d\n",
                             epoch, maplen);
@@ -1780,12 +1816,17 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                        skipped_map = 1;
                                ceph_osdmap_destroy(oldmap);
                        }
-                       kick_requests(osdc, skipped_map);
+                       was_full = was_full ||
+                               ceph_osdmap_flag(osdc->osdmap,
+                                                CEPH_OSDMAP_FULL);
+                       kick_requests(osdc, skipped_map, was_full);
                }
                p += maplen;
                nr_maps--;
        }
 
+       if (!osdc->osdmap)
+               goto bad;
 done:
        downgrade_write(&osdc->map_sem);
        ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
@@ -1795,7 +1836,9 @@ done:
         * we find out when we are no longer full and stop returning
         * ENOSPC.
         */
-       if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
+       if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+               ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
+               ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
                ceph_monc_request_next_osdmap(&osdc->client->monc);
 
        mutex_lock(&osdc->request_mutex);
@@ -2123,13 +2166,14 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
        __register_request(osdc, req);
        req->r_sent = 0;
        req->r_got_reply = 0;
-       req->r_completed = 0;
        rc = __map_request(osdc, req, 0);
        if (rc < 0) {
                if (nofail) {
                        dout("osdc_start_request failed map, "
                                " will retry %lld\n", req->r_tid);
                        rc = 0;
+               } else {
+                       __unregister_request(osdc, req);
                }
                goto out_unlock;
        }
@@ -2205,6 +2249,17 @@ void ceph_osdc_sync(struct ceph_osd_client *osdc)
 }
 EXPORT_SYMBOL(ceph_osdc_sync);
 
+/*
+ * Call all pending notify callbacks - for use after a watch is
+ * unregistered, to make sure no more callbacks for it will be invoked
+ */
+extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
+{
+       flush_workqueue(osdc->notify_wq);
+}
+EXPORT_SYMBOL(ceph_osdc_flush_notifies);
+
+
 /*
  * init, shutdown
  */
@@ -2254,12 +2309,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        if (err < 0)
                goto out_msgpool;
 
+       err = -ENOMEM;
        osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
-       if (IS_ERR(osdc->notify_wq)) {
-               err = PTR_ERR(osdc->notify_wq);
-               osdc->notify_wq = NULL;
+       if (!osdc->notify_wq)
                goto out_msgpool;
-       }
        return 0;
 
 out_msgpool: