Merge branch 'drbd-8.4_ed6' into for-3.8-drivers-drbd-8.4_ed6
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
index 8b99f4e28ccc2cdc5ff03df07aad7853fda73eaa..52de26daa1f6c4dceed2115dac702aa431d2d289 100644 (file)
@@ -105,8 +105,8 @@ module_param(fault_devs, int, 0644);
 
 /* module parameter, defined */
 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
-int disable_sendpage;
-int allow_oos;
+bool disable_sendpage;
+bool allow_oos;
 int proc_details;       /* Detail level in proc drbd*/
 
 /* Module parameter for setting the user mode helper program
@@ -149,11 +149,6 @@ static const struct block_device_operations drbd_ops = {
        .release = drbd_release,
 };
 
-static void bio_destructor_drbd(struct bio *bio)
-{
-       bio_free(bio, drbd_md_io_bio_set);
-}
-
 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
 {
        struct bio *bio;
@@ -164,7 +159,6 @@ struct bio *bio_alloc_drbd(gfp_t gfp_mask)
        bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
        if (!bio)
                return NULL;
-       bio->bi_destructor = bio_destructor_drbd;
        return bio;
 }
 
@@ -188,152 +182,81 @@ int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
 #endif
 
 /**
- * DOC: The transfer log
- *
- * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
- * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
- * of the list. There is always at least one &struct drbd_tl_epoch object.
- *
- * Each &struct drbd_tl_epoch has a circular double linked list of requests
- * attached.
- */
-static int tl_init(struct drbd_tconn *tconn)
-{
-       struct drbd_tl_epoch *b;
-
-       /* during device minor initialization, we may well use GFP_KERNEL */
-       b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
-       if (!b)
-               return 0;
-       INIT_LIST_HEAD(&b->requests);
-       INIT_LIST_HEAD(&b->w.list);
-       b->next = NULL;
-       b->br_number = 4711;
-       b->n_writes = 0;
-       b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
-
-       tconn->oldest_tle = b;
-       tconn->newest_tle = b;
-       INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
-       INIT_LIST_HEAD(&tconn->barrier_acked_requests);
-
-       return 1;
-}
-
-static void tl_cleanup(struct drbd_tconn *tconn)
-{
-       if (tconn->oldest_tle != tconn->newest_tle)
-               conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
-       if (!list_empty(&tconn->out_of_sequence_requests))
-               conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
-       kfree(tconn->oldest_tle);
-       tconn->oldest_tle = NULL;
-       kfree(tconn->unused_spare_tle);
-       tconn->unused_spare_tle = NULL;
-}
-
-/**
- * _tl_add_barrier() - Adds a barrier to the transfer log
- * @mdev:      DRBD device.
- * @new:       Barrier to be added before the current head of the TL.
- *
- * The caller must hold the req_lock.
- */
-void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
-{
-       struct drbd_tl_epoch *newest_before;
-
-       INIT_LIST_HEAD(&new->requests);
-       INIT_LIST_HEAD(&new->w.list);
-       new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
-       new->next = NULL;
-       new->n_writes = 0;
-
-       newest_before = tconn->newest_tle;
-       /* never send a barrier number == 0, because that is special-cased
-        * when using TCQ for our write ordering code */
-       new->br_number = (newest_before->br_number+1) ?: 1;
-       if (tconn->newest_tle != new) {
-               tconn->newest_tle->next = new;
-               tconn->newest_tle = new;
-       }
-}
-
-/**
- * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
- * @mdev:      DRBD device.
+ * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
+ * @tconn:     DRBD connection.
  * @barrier_nr:        Expected identifier of the DRBD write barrier packet.
  * @set_size:  Expected number of requests before that barrier.
  *
  * In case the passed barrier_nr or set_size does not match the oldest
- * &struct drbd_tl_epoch objects this function will cause a termination
- * of the connection.
+ * epoch of not yet barrier-acked requests, this function will cause a
+ * termination of the connection.
  */
 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
                unsigned int set_size)
 {
-       struct drbd_conf *mdev;
-       struct drbd_tl_epoch *b, *nob; /* next old barrier */
-       struct list_head *le, *tle;
        struct drbd_request *r;
+       struct drbd_request *req = NULL;
+       int expect_epoch = 0;
+       int expect_size = 0;
 
        spin_lock_irq(&tconn->req_lock);
 
-       b = tconn->oldest_tle;
+       /* find oldest not yet barrier-acked write request,
+        * count writes in its epoch. */
+       list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
+               const unsigned s = r->rq_state;
+               if (!req) {
+                       if (!(s & RQ_WRITE))
+                               continue;
+                       if (!(s & RQ_NET_MASK))
+                               continue;
+                       if (s & RQ_NET_DONE)
+                               continue;
+                       req = r;
+                       expect_epoch = req->epoch;
+                       expect_size ++;
+               } else {
+                       if (r->epoch != expect_epoch)
+                               break;
+                       if (!(s & RQ_WRITE))
+                               continue;
+                       /* if (s & RQ_DONE): not expected */
+                       /* if (!(s & RQ_NET_MASK)): not expected */
+                       expect_size++;
+               }
+       }
 
        /* first some paranoia code */
-       if (b == NULL) {
+       if (req == NULL) {
                conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
                         barrier_nr);
                goto bail;
        }
-       if (b->br_number != barrier_nr) {
+       if (expect_epoch != barrier_nr) {
                conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
-                        barrier_nr, b->br_number);
+                        barrier_nr, expect_epoch);
                goto bail;
        }
-       if (b->n_writes != set_size) {
+
+       if (expect_size != set_size) {
                conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
-                        barrier_nr, set_size, b->n_writes);
+                        barrier_nr, set_size, expect_size);
                goto bail;
        }
 
-       /* Clean up list of requests processed during current epoch */
-       list_for_each_safe(le, tle, &b->requests) {
-               r = list_entry(le, struct drbd_request, tl_requests);
-               _req_mod(r, BARRIER_ACKED);
-       }
-       /* There could be requests on the list waiting for completion
-          of the write to the local disk. To avoid corruptions of
-          slab's data structures we have to remove the lists head.
-
-          Also there could have been a barrier ack out of sequence, overtaking
-          the write acks - which would be a bug and violating write ordering.
-          To not deadlock in case we lose connection while such requests are
-          still pending, we need some way to find them for the
-          _req_mode(CONNECTION_LOST_WHILE_PENDING).
-
-          These have been list_move'd to the out_of_sequence_requests list in
-          _req_mod(, BARRIER_ACKED) above.
-          */
-       list_splice_init(&b->requests, &tconn->barrier_acked_requests);
-       mdev = b->w.mdev;
-
-       nob = b->next;
-       if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) {
-               _tl_add_barrier(tconn, b);
-               if (nob)
-                       tconn->oldest_tle = nob;
-               /* if nob == NULL b was the only barrier, and becomes the new
-                  barrier. Therefore tconn->oldest_tle points already to b */
-       } else {
-               D_ASSERT(nob != NULL);
-               tconn->oldest_tle = nob;
-               kfree(b);
+       /* Clean up list of requests processed during current epoch. */
+       /* this extra list walk restart is paranoia,
+        * to catch requests being barrier-acked "unexpectedly".
+        * It usually should find the same req again, or some READ preceding it. */
+       list_for_each_entry(req, &tconn->transfer_log, tl_requests)
+               if (req->epoch == expect_epoch)
+                       break;
+       list_for_each_entry_safe_from(req, r, &tconn->transfer_log, tl_requests) {
+               if (req->epoch != expect_epoch)
+                       break;
+               _req_mod(req, BARRIER_ACKED);
        }
-
        spin_unlock_irq(&tconn->req_lock);
-       dec_ap_pending(mdev);
 
        return;
 
@@ -351,91 +274,20 @@ bail:
  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
  * RESTART_FROZEN_DISK_IO.
  */
+/* must hold resource->req_lock */
 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
 {
-       struct drbd_tl_epoch *b, *tmp, **pn;
-       struct list_head *le, *tle, carry_reads;
-       struct drbd_request *req;
-       int rv, n_writes, n_reads;
-
-       b = tconn->oldest_tle;
-       pn = &tconn->oldest_tle;
-       while (b) {
-               n_writes = 0;
-               n_reads = 0;
-               INIT_LIST_HEAD(&carry_reads);
-               list_for_each_safe(le, tle, &b->requests) {
-                       req = list_entry(le, struct drbd_request, tl_requests);
-                       rv = _req_mod(req, what);
-
-                       if (rv & MR_WRITE)
-                               n_writes++;
-                       if (rv & MR_READ)
-                               n_reads++;
-               }
-               tmp = b->next;
-
-               if (n_writes) {
-                       if (what == RESEND) {
-                               b->n_writes = n_writes;
-                               if (b->w.cb == NULL) {
-                                       b->w.cb = w_send_barrier;
-                                       inc_ap_pending(b->w.mdev);
-                                       set_bit(CREATE_BARRIER, &tconn->flags);
-                               }
-
-                               drbd_queue_work(&tconn->data.work, &b->w);
-                       }
-                       pn = &b->next;
-               } else {
-                       if (n_reads)
-                               list_add(&carry_reads, &b->requests);
-                       /* there could still be requests on that ring list,
-                        * in case local io is still pending */
-                       list_del(&b->requests);
-
-                       /* dec_ap_pending corresponding to queue_barrier.
-                        * the newest barrier may not have been queued yet,
-                        * in which case w.cb is still NULL. */
-                       if (b->w.cb != NULL)
-                               dec_ap_pending(b->w.mdev);
-
-                       if (b == tconn->newest_tle) {
-                               /* recycle, but reinit! */
-                               if (tmp != NULL)
-                                       conn_err(tconn, "ASSERT FAILED tmp == NULL");
-                               INIT_LIST_HEAD(&b->requests);
-                               list_splice(&carry_reads, &b->requests);
-                               INIT_LIST_HEAD(&b->w.list);
-                               b->w.cb = NULL;
-                               b->br_number = net_random();
-                               b->n_writes = 0;
-
-                               *pn = b;
-                               break;
-                       }
-                       *pn = tmp;
-                       kfree(b);
-               }
-               b = tmp;
-               list_splice(&carry_reads, &b->requests);
-       }
-
-       /* Actions operating on the disk state, also want to work on
-          requests that got barrier acked. */
-       switch (what) {
-       case FAIL_FROZEN_DISK_IO:
-       case RESTART_FROZEN_DISK_IO:
-               list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
-                       req = list_entry(le, struct drbd_request, tl_requests);
-                       _req_mod(req, what);
-               }
-       case CONNECTION_LOST_WHILE_PENDING:
-       case RESEND:
-               break;
-       default:
-               conn_err(tconn, "what = %d in _tl_restart()\n", what);
-       }
+       struct drbd_request *req, *r;
+
+       list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests)
+               _req_mod(req, what);
+}
+
+void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
+{
+       spin_lock_irq(&tconn->req_lock);
+       _tl_restart(tconn, what);
+       spin_unlock_irq(&tconn->req_lock);
 }
 
 /**
@@ -448,36 +300,7 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
  */
 void tl_clear(struct drbd_tconn *tconn)
 {
-       struct list_head *le, *tle;
-       struct drbd_request *r;
-
-       spin_lock_irq(&tconn->req_lock);
-
-       _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
-
-       /* we expect this list to be empty. */
-       if (!list_empty(&tconn->out_of_sequence_requests))
-               conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
-
-       /* but just in case, clean it up anyways! */
-       list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
-               r = list_entry(le, struct drbd_request, tl_requests);
-               /* It would be nice to complete outside of spinlock.
-                * But this is easier for now. */
-               _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
-       }
-
-       /* ensure bit indicating barrier is required is clear */
-       clear_bit(CREATE_BARRIER, &tconn->flags);
-
-       spin_unlock_irq(&tconn->req_lock);
-}
-
-void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
-{
-       spin_lock_irq(&tconn->req_lock);
-       _tl_restart(tconn, what);
-       spin_unlock_irq(&tconn->req_lock);
+       tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
 }
 
 /**
@@ -487,31 +310,16 @@ void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
 void tl_abort_disk_io(struct drbd_conf *mdev)
 {
        struct drbd_tconn *tconn = mdev->tconn;
-       struct drbd_tl_epoch *b;
-       struct list_head *le, *tle;
-       struct drbd_request *req;
+       struct drbd_request *req, *r;
 
        spin_lock_irq(&tconn->req_lock);
-       b = tconn->oldest_tle;
-       while (b) {
-               list_for_each_safe(le, tle, &b->requests) {
-                       req = list_entry(le, struct drbd_request, tl_requests);
-                       if (!(req->rq_state & RQ_LOCAL_PENDING))
-                               continue;
-                       if (req->w.mdev == mdev)
-                               _req_mod(req, ABORT_DISK_IO);
-               }
-               b = b->next;
-       }
-
-       list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
-               req = list_entry(le, struct drbd_request, tl_requests);
+       list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
                if (!(req->rq_state & RQ_LOCAL_PENDING))
                        continue;
-               if (req->w.mdev == mdev)
-                       _req_mod(req, ABORT_DISK_IO);
+               if (req->w.mdev != mdev)
+                       continue;
+               _req_mod(req, ABORT_DISK_IO);
        }
-
        spin_unlock_irq(&tconn->req_lock);
 }
 
@@ -1030,8 +838,10 @@ int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
                put_ldev(mdev);
                return -EIO;
        }
+       spin_lock_irq(&mdev->ldev->md.uuid_lock);
        for (i = UI_CURRENT; i < UI_SIZE; i++)
                p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
+       spin_unlock_irq(&mdev->ldev->md.uuid_lock);
 
        mdev->comm_bm_set = drbd_bm_total_weight(mdev);
        p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
@@ -1104,7 +914,8 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
        struct drbd_socket *sock;
        struct p_sizes *p;
        sector_t d_size, u_size;
-       int q_order_type, max_bio_size;
+       int q_order_type;
+       unsigned int max_bio_size;
 
        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
                D_ASSERT(mdev->ldev->backing_bdev);
@@ -1114,7 +925,7 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
                rcu_read_unlock();
                q_order_type = drbd_queue_order_type(mdev);
                max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
-               max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
+               max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE);
                put_ldev(mdev);
        } else {
                d_size = 0;
@@ -1129,9 +940,9 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
                return -EIO;
 
        if (mdev->tconn->agreed_pro_version <= 94)
-               max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
+               max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
        else if (mdev->tconn->agreed_pro_version < 100)
-               max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
+               max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE_P95);
 
        p->d_size = cpu_to_be64(d_size);
        p->u_size = cpu_to_be64(u_size);
@@ -1465,21 +1276,21 @@ int drbd_send_bitmap(struct drbd_conf *mdev)
        return err;
 }
 
-void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
+void drbd_send_b_ack(struct drbd_tconn *tconn, u32 barrier_nr, u32 set_size)
 {
        struct drbd_socket *sock;
        struct p_barrier_ack *p;
 
-       if (mdev->state.conn < C_CONNECTED)
+       if (tconn->cstate < C_WF_REPORT_PARAMS)
                return;
 
-       sock = &mdev->tconn->meta;
-       p = drbd_prepare_command(mdev, sock);
+       sock = &tconn->meta;
+       p = conn_prepare_command(tconn, sock);
        if (!p)
                return;
        p->barrier = barrier_nr;
        p->set_size = cpu_to_be32(set_size);
-       drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
+       conn_send_command(tconn, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
 }
 
 /**
@@ -1729,7 +1540,7 @@ static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
        struct bio_vec *bvec;
        int i;
        /* hint all but last page with MSG_MORE */
-       __bio_for_each_segment(bvec, bio, i, 0) {
+       bio_for_each_segment(bvec, bio, i) {
                int err;
 
                err = _drbd_no_send_page(mdev, bvec->bv_page,
@@ -1746,7 +1557,7 @@ static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
        struct bio_vec *bvec;
        int i;
        /* hint all but last page with MSG_MORE */
-       __bio_for_each_segment(bvec, bio, i, 0) {
+       bio_for_each_segment(bvec, bio, i) {
                int err;
 
                err = _drbd_send_page(mdev, bvec->bv_page,
@@ -1808,7 +1619,7 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
                return -EIO;
        p->sector = cpu_to_be64(req->i.sector);
        p->block_id = (unsigned long)req;
-       p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
+       p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
        dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
        if (mdev->state.conn >= C_SYNC_SOURCE &&
            mdev->state.conn <= C_PAUSED_SYNC_T)
@@ -1882,6 +1693,7 @@ int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
        p->sector = cpu_to_be64(peer_req->i.sector);
        p->block_id = peer_req->block_id;
        p->seq_num = 0;  /* unused */
+       p->dp_flags = 0;
        if (dgs)
                drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
        err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
@@ -2177,8 +1989,7 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev)
        D_ASSERT(list_empty(&mdev->read_ee));
        D_ASSERT(list_empty(&mdev->net_ee));
        D_ASSERT(list_empty(&mdev->resync_reads));
-       D_ASSERT(list_empty(&mdev->tconn->data.work.q));
-       D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
+       D_ASSERT(list_empty(&mdev->tconn->sender_work.q));
        D_ASSERT(list_empty(&mdev->resync_work.list));
        D_ASSERT(list_empty(&mdev->unplug_work.list));
        D_ASSERT(list_empty(&mdev->go_diskless.list));
@@ -2353,7 +2164,6 @@ void drbd_minor_destroy(struct kref *kref)
 
        /* paranoia asserts */
        D_ASSERT(mdev->open_cnt == 0);
-       D_ASSERT(list_empty(&mdev->tconn->data.work.q));
        /* end paranoia asserts */
 
        /* cleanup stuff that may have been allocated during
@@ -2384,6 +2194,85 @@ void drbd_minor_destroy(struct kref *kref)
        kref_put(&tconn->kref, &conn_destroy);
 }
 
+/* One global retry thread, if we need to push back some bio and have it
+ * reinserted through our make request function.
+ */
+static struct retry_worker {
+       struct workqueue_struct *wq;
+       struct work_struct worker;
+
+       spinlock_t lock;
+       struct list_head writes;
+} retry;
+
+static void do_retry(struct work_struct *ws)
+{
+       struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
+       LIST_HEAD(writes);
+       struct drbd_request *req, *tmp;
+
+       spin_lock_irq(&retry->lock);
+       list_splice_init(&retry->writes, &writes);
+       spin_unlock_irq(&retry->lock);
+
+       list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
+               struct drbd_conf *mdev = req->w.mdev;
+               struct bio *bio = req->master_bio;
+               unsigned long start_time = req->start_time;
+               bool expected;
+
+               expected = 
+                       expect(atomic_read(&req->completion_ref) == 0) &&
+                       expect(req->rq_state & RQ_POSTPONED) &&
+                       expect((req->rq_state & RQ_LOCAL_PENDING) == 0 ||
+                               (req->rq_state & RQ_LOCAL_ABORTED) != 0);
+
+               if (!expected)
+                       dev_err(DEV, "req=%p completion_ref=%d rq_state=%x\n",
+                               req, atomic_read(&req->completion_ref),
+                               req->rq_state);
+
+               /* We still need to put one kref associated with the
+                * "completion_ref" going zero in the code path that queued it
+                * here.  The request object may still be referenced by a
+                * frozen local req->private_bio, in case we force-detached.
+                */
+               kref_put(&req->kref, drbd_req_destroy);
+
+               /* A single suspended or otherwise blocking device may stall
+                * all others as well.  Fortunately, this code path is to
+                * recover from a situation that "should not happen":
+                * concurrent writes in multi-primary setup.
+                * In a "normal" lifecycle, this workqueue is supposed to be
+                * destroyed without ever doing anything.
+                * If it turns out to be an issue anyways, we can do per
+                * resource (replication group) or per device (minor) retry
+                * workqueues instead.
+                */
+
+               /* We are not just doing generic_make_request(),
+                * as we want to keep the start_time information. */
+               inc_ap_bio(mdev);
+               __drbd_make_request(mdev, bio, start_time);
+       }
+}
+
+void drbd_restart_request(struct drbd_request *req)
+{
+       unsigned long flags;
+       spin_lock_irqsave(&retry.lock, flags);
+       list_move_tail(&req->tl_requests, &retry.writes);
+       spin_unlock_irqrestore(&retry.lock, flags);
+
+       /* Drop the extra reference that would otherwise
+        * have been dropped by complete_master_bio.
+        * do_retry() needs to grab a new one. */
+       dec_ap_bio(req->w.mdev);
+
+       queue_work(retry.wq, &retry.worker);
+}
+
+
 static void drbd_cleanup(void)
 {
        unsigned int i;
@@ -2403,6 +2292,9 @@ static void drbd_cleanup(void)
        if (drbd_proc)
                remove_proc_entry("drbd", NULL);
 
+       if (retry.wq)
+               destroy_workqueue(retry.wq);
+
        drbd_genl_unregister();
 
        idr_for_each_entry(&minors, mdev, i) {
@@ -2429,9 +2321,9 @@ static void drbd_cleanup(void)
 }
 
 /**
- * drbd_congested() - Callback for pdflush
+ * drbd_congested() - Callback for the flusher thread
  * @congested_data:    User data
- * @bdi_bits:          Bits pdflush is currently interested in
+ * @bdi_bits:          Bits the BDI flusher thread is currently interested in
  *
  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
  */
@@ -2449,6 +2341,22 @@ static int drbd_congested(void *congested_data, int bdi_bits)
                goto out;
        }
 
+       if (test_bit(CALLBACK_PENDING, &mdev->tconn->flags)) {
+               r |= (1 << BDI_async_congested);
+               /* Without good local data, we would need to read from remote,
+                * and that would need the worker thread as well, which is
+                * currently blocked waiting for that usermode helper to
+                * finish.
+                */
+               if (!get_ldev_if_state(mdev, D_UP_TO_DATE))
+                       r |= (1 << BDI_sync_congested);
+               else
+                       put_ldev(mdev);
+               r &= bdi_bits;
+               reason = 'c';
+               goto out;
+       }
+
        if (get_ldev(mdev)) {
                q = bdev_get_queue(mdev->ldev->backing_bdev);
                r = bdi_congested(&q->backing_dev_info, bdi_bits);
@@ -2469,9 +2377,9 @@ out:
 
 static void drbd_init_workqueue(struct drbd_work_queue* wq)
 {
-       sema_init(&wq->s, 0);
        spin_lock_init(&wq->q_lock);
        INIT_LIST_HEAD(&wq->q);
+       init_waitqueue_head(&wq->q_wait);
 }
 
 struct drbd_tconn *conn_get_by_name(const char *name)
@@ -2568,10 +2476,10 @@ int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
        /* silently ignore cpu mask on UP kernel */
        if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
                /* FIXME: Get rid of constant 32 here */
-               err = __bitmap_parse(res_opts->cpu_mask, 32, 0,
-                               cpumask_bits(new_cpu_mask), nr_cpu_ids);
+               err = bitmap_parse(res_opts->cpu_mask, 32,
+                                  cpumask_bits(new_cpu_mask), nr_cpu_ids);
                if (err) {
-                       conn_warn(tconn, "__bitmap_parse() failed with %d\n", err);
+                       conn_warn(tconn, "bitmap_parse() failed with %d\n", err);
                        /* retcode = ERR_CPU_MASK_PARSE; */
                        goto fail;
                }
@@ -2616,17 +2524,21 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
        if (set_resource_options(tconn, res_opts))
                goto fail;
 
-       if (!tl_init(tconn))
-               goto fail;
-
        tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
        if (!tconn->current_epoch)
                goto fail;
+
+       INIT_LIST_HEAD(&tconn->transfer_log);
+
        INIT_LIST_HEAD(&tconn->current_epoch->list);
        tconn->epochs = 1;
        spin_lock_init(&tconn->epoch_lock);
        tconn->write_ordering = WO_bdev_flush;
 
+       tconn->send.seen_any_write_yet = false;
+       tconn->send.current_epoch_nr = 0;
+       tconn->send.current_epoch_writes = 0;
+
        tconn->cstate = C_STANDALONE;
        mutex_init(&tconn->cstate_mutex);
        spin_lock_init(&tconn->req_lock);
@@ -2634,10 +2546,8 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
        init_waitqueue_head(&tconn->ping_wait);
        idr_init(&tconn->volumes);
 
-       drbd_init_workqueue(&tconn->data.work);
+       drbd_init_workqueue(&tconn->sender_work);
        mutex_init(&tconn->data.mutex);
-
-       drbd_init_workqueue(&tconn->meta.work);
        mutex_init(&tconn->meta.mutex);
 
        drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
@@ -2651,7 +2561,6 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
 
 fail:
        kfree(tconn->current_epoch);
-       tl_cleanup(tconn);
        free_cpumask_var(tconn->cpu_mask);
        drbd_free_socket(&tconn->meta);
        drbd_free_socket(&tconn->data);
@@ -2734,6 +2643,7 @@ enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor,
        q->backing_dev_info.congested_data = mdev;
 
        blk_queue_make_request(q, drbd_make_request);
+       blk_queue_flush(q, REQ_FLUSH | REQ_FUA);
        /* Setting the max_hw_sectors to an odd value of 8kibyte here
           This triggers a max_bio_size message upon first attach or connect */
        blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
@@ -2852,6 +2762,15 @@ int __init drbd_init(void)
        rwlock_init(&global_state_lock);
        INIT_LIST_HEAD(&drbd_tconns);
 
+       retry.wq = create_singlethread_workqueue("drbd-reissue");
+       if (!retry.wq) {
+               printk(KERN_ERR "drbd: unable to create retry workqueue\n");
+               goto fail;
+       }
+       INIT_WORK(&retry.worker, do_retry);
+       spin_lock_init(&retry.lock);
+       INIT_LIST_HEAD(&retry.writes);
+
        printk(KERN_INFO "drbd: initialized. "
               "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
               API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
@@ -2902,6 +2821,22 @@ void drbd_free_sock(struct drbd_tconn *tconn)
 
 /* meta data management */
 
+void conn_md_sync(struct drbd_tconn *tconn)
+{
+       struct drbd_conf *mdev;
+       int vnr;
+
+       rcu_read_lock();
+       idr_for_each_entry(&tconn->volumes, mdev, vnr) {
+               kref_get(&mdev->kref);
+               rcu_read_unlock();
+               drbd_md_sync(mdev);
+               kref_put(&mdev->kref, &drbd_minor_destroy);
+               rcu_read_lock();
+       }
+       rcu_read_unlock();
+}
+
 struct meta_data_on_disk {
        u64 la_size;           /* last agreed size. */
        u64 uuid[UI_SIZE];   /* UUIDs. */
@@ -2967,7 +2902,7 @@ void drbd_md_sync(struct drbd_conf *mdev)
        if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
                /* this was a try anyways ... */
                dev_err(DEV, "meta data update failed!\n");
-               drbd_chk_io_error(mdev, 1, true);
+               drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
        }
 
        /* Update mdev->ldev->md.la_size_sect,
@@ -3059,9 +2994,9 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
 
        spin_lock_irq(&mdev->tconn->req_lock);
        if (mdev->state.conn < C_CONNECTED) {
-               int peer;
+               unsigned int peer;
                peer = be32_to_cpu(buffer->la_peer_max_bio_size);
-               peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
+               peer = max(peer, DRBD_MAX_BIO_SIZE_SAFE);
                mdev->peer_max_bio_size = peer;
        }
        spin_unlock_irq(&mdev->tconn->req_lock);
@@ -3099,7 +3034,7 @@ void drbd_md_mark_dirty(struct drbd_conf *mdev)
 }
 #endif
 
-static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
+void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
 {
        int i;
 
@@ -3107,7 +3042,7 @@ static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
                mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
 }
 
-void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
+void __drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
 {
        if (idx == UI_CURRENT) {
                if (mdev->state.role == R_PRIMARY)
@@ -3122,14 +3057,24 @@ void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
        drbd_md_mark_dirty(mdev);
 }
 
+void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
+{
+       unsigned long flags;
+       spin_lock_irqsave(&mdev->ldev->md.uuid_lock, flags);
+       __drbd_uuid_set(mdev, idx, val);
+       spin_unlock_irqrestore(&mdev->ldev->md.uuid_lock, flags);
+}
 
 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
 {
+       unsigned long flags;
+       spin_lock_irqsave(&mdev->ldev->md.uuid_lock, flags);
        if (mdev->ldev->md.uuid[idx]) {
                drbd_uuid_move_history(mdev);
                mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
        }
-       _drbd_uuid_set(mdev, idx, val);
+       __drbd_uuid_set(mdev, idx, val);
+       spin_unlock_irqrestore(&mdev->ldev->md.uuid_lock, flags);
 }
 
 /**
@@ -3142,15 +3087,20 @@ void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
 {
        u64 val;
-       unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
+       unsigned long long bm_uuid;
+
+       get_random_bytes(&val, sizeof(u64));
+
+       spin_lock_irq(&mdev->ldev->md.uuid_lock);
+       bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
 
        if (bm_uuid)
                dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
 
        mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
+       __drbd_uuid_set(mdev, UI_CURRENT, val);
+       spin_unlock_irq(&mdev->ldev->md.uuid_lock);
 
-       get_random_bytes(&val, sizeof(u64));
-       _drbd_uuid_set(mdev, UI_CURRENT, val);
        drbd_print_uuids(mdev, "new current UUID");
        /* get it to stable storage _now_ */
        drbd_md_sync(mdev);
@@ -3158,9 +3108,11 @@ void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
 
 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
 {
+       unsigned long flags;
        if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
                return;
 
+       spin_lock_irqsave(&mdev->ldev->md.uuid_lock, flags);
        if (val == 0) {
                drbd_uuid_move_history(mdev);
                mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
@@ -3172,6 +3124,8 @@ void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
 
                mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
        }
+       spin_unlock_irqrestore(&mdev->ldev->md.uuid_lock, flags);
+
        drbd_md_mark_dirty(mdev);
 }
 
@@ -3273,6 +3227,30 @@ static int w_go_diskless(struct drbd_work *w, int unused)
         * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
         * the protected members anymore, though, so once put_ldev reaches zero
         * again, it will be safe to free them. */
+
+       /* Try to write changed bitmap pages, read errors may have just
+        * set some bits outside the area covered by the activity log.
+        *
+        * If we have an IO error during the bitmap writeout,
+        * we will want a full sync next time, just in case.
+        * (Do we want a specific meta data flag for this?)
+        *
+        * If that does not make it to stable storage either,
+        * we cannot do anything about that anymore.
+        *
+        * We still need to check if both bitmap and ldev are present, we may
+        * end up here after a failed attach, before ldev was even assigned.
+        */
+       if (mdev->bitmap && mdev->ldev) {
+               if (drbd_bitmap_io_from_worker(mdev, drbd_bm_write,
+                                       "detach", BM_LOCKED_MASK)) {
+                       if (test_bit(WAS_READ_ERROR, &mdev->flags)) {
+                               drbd_md_set_flag(mdev, MDF_FULL_SYNC);
+                               drbd_md_sync(mdev);
+                       }
+               }
+       }
+
        drbd_force_state(mdev, NS(disk, D_DISKLESS));
        return 0;
 }
@@ -3281,7 +3259,7 @@ void drbd_go_diskless(struct drbd_conf *mdev)
 {
        D_ASSERT(mdev->state.disk == D_FAILED);
        if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
-               drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
+               drbd_queue_work(&mdev->tconn->sender_work, &mdev->go_diskless);
 }
 
 /**
@@ -3319,7 +3297,7 @@ void drbd_queue_bitmap_io(struct drbd_conf *mdev,
        set_bit(BITMAP_IO, &mdev->flags);
        if (atomic_read(&mdev->ap_bio_cnt) == 0) {
                if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
-                       drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
+                       drbd_queue_work(&mdev->tconn->sender_work, &mdev->bm_io_work.w);
        }
        spin_unlock_irq(&mdev->tconn->req_lock);
 }
@@ -3377,7 +3355,9 @@ static void md_sync_timer_fn(unsigned long data)
 {
        struct drbd_conf *mdev = (struct drbd_conf *) data;
 
-       drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
+       /* must not double-queue! */
+       if (list_empty(&mdev->md_sync_work.list))
+               drbd_queue_work_front(&mdev->tconn->sender_work, &mdev->md_sync_work);
 }
 
 static int w_md_sync(struct drbd_work *w, int unused)
@@ -3423,7 +3403,7 @@ const char *cmdname(enum drbd_packet cmd)
                [P_RECV_ACK]            = "RecvAck",
                [P_WRITE_ACK]           = "WriteAck",
                [P_RS_WRITE_ACK]        = "RSWriteAck",
-               [P_DISCARD_WRITE]        = "DiscardWrite",
+               [P_SUPERSEDED]          = "Superseded",
                [P_NEG_ACK]             = "NegAck",
                [P_NEG_DREPLY]          = "NegDReply",
                [P_NEG_RS_DREPLY]       = "NegRSDReply",
@@ -3576,12 +3556,11 @@ const char *drbd_buildtag(void)
        static char buildtag[38] = "\0uilt-in";
 
        if (buildtag[0] == 0) {
-#ifdef CONFIG_MODULES
-               if (THIS_MODULE != NULL)
-                       sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
-               else
+#ifdef MODULE
+               sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
+#else
+               buildtag[0] = 'b';
 #endif
-                       buildtag[0] = 'b';
        }
 
        return buildtag;