drbd: log request sector offset and size for IO errors
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43                                  struct drbd_work *w, int cancel);
44
45
46
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59
60
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72         struct drbd_md_io *md_io;
73         struct drbd_conf *mdev;
74
75         md_io = (struct drbd_md_io *)bio->bi_private;
76         mdev = container_of(md_io, struct drbd_conf, md_io);
77
78         md_io->error = error;
79
80         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
81          * to timeout on the lower level device, and eventually detach from it.
82          * If this io completion runs after that timeout expired, this
83          * drbd_md_put_buffer() may allow us to finally try and re-attach.
84          * During normal operation, this only puts that extra reference
85          * down to 1 again.
86          * Make sure we first drop the reference, and only then signal
87          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
88          * next drbd_md_sync_page_io(), that we trigger the
89          * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
90          */
91         drbd_md_put_buffer(mdev);
92         md_io->done = 1;
93         wake_up(&mdev->misc_wait);
94         bio_put(bio);
95         put_ldev(mdev);
96 }
97
98 /* reads on behalf of the partner,
99  * "submitted" by the receiver
100  */
101 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
102 {
103         unsigned long flags = 0;
104         struct drbd_conf *mdev = e->mdev;
105
106         D_ASSERT(e->block_id != ID_VACANT);
107
108         spin_lock_irqsave(&mdev->req_lock, flags);
109         mdev->read_cnt += e->size >> 9;
110         list_del(&e->w.list);
111         if (list_empty(&mdev->read_ee))
112                 wake_up(&mdev->ee_wait);
113         if (test_bit(__EE_WAS_ERROR, &e->flags))
114                 __drbd_chk_io_error(mdev, DRBD_READ_ERROR);
115         spin_unlock_irqrestore(&mdev->req_lock, flags);
116
117         drbd_queue_work(&mdev->data.work, &e->w);
118         put_ldev(mdev);
119 }
120
121 /* writes on behalf of the partner, or resync writes,
122  * "submitted" by the receiver, final stage.  */
123 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
124 {
125         unsigned long flags = 0;
126         struct drbd_conf *mdev = e->mdev;
127         sector_t e_sector;
128         int do_wake;
129         int is_syncer_req;
130         int do_al_complete_io;
131
132         D_ASSERT(e->block_id != ID_VACANT);
133
134         /* after we moved e to done_ee,
135          * we may no longer access it,
136          * it may be freed/reused already!
137          * (as soon as we release the req_lock) */
138         e_sector = e->sector;
139         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
140         is_syncer_req = is_syncer_block_id(e->block_id);
141
142         spin_lock_irqsave(&mdev->req_lock, flags);
143         mdev->writ_cnt += e->size >> 9;
144         list_del(&e->w.list); /* has been on active_ee or sync_ee */
145         list_add_tail(&e->w.list, &mdev->done_ee);
146
147         /* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
148          * neither did we wake possibly waiting conflicting requests.
149          * done from "drbd_process_done_ee" within the appropriate w.cb
150          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
151
152         do_wake = is_syncer_req
153                 ? list_empty(&mdev->sync_ee)
154                 : list_empty(&mdev->active_ee);
155
156         if (test_bit(__EE_WAS_ERROR, &e->flags))
157                 __drbd_chk_io_error(mdev, DRBD_WRITE_ERROR);
158         spin_unlock_irqrestore(&mdev->req_lock, flags);
159
160         if (is_syncer_req)
161                 drbd_rs_complete_io(mdev, e_sector);
162
163         if (do_wake)
164                 wake_up(&mdev->ee_wait);
165
166         if (do_al_complete_io)
167                 drbd_al_complete_io(mdev, e_sector);
168
169         wake_asender(mdev);
170         put_ldev(mdev);
171 }
172
173 /* writes on behalf of the partner, or resync writes,
174  * "submitted" by the receiver.
175  */
176 void drbd_endio_sec(struct bio *bio, int error)
177 {
178         struct drbd_epoch_entry *e = bio->bi_private;
179         struct drbd_conf *mdev = e->mdev;
180         int uptodate = bio_flagged(bio, BIO_UPTODATE);
181         int is_write = bio_data_dir(bio) == WRITE;
182
183         if (error && __ratelimit(&drbd_ratelimit_state))
184                 dev_warn(DEV, "%s: error=%d s=%llus\n",
185                                 is_write ? "write" : "read", error,
186                                 (unsigned long long)e->sector);
187         if (!error && !uptodate) {
188                 if (__ratelimit(&drbd_ratelimit_state))
189                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
190                                         is_write ? "write" : "read",
191                                         (unsigned long long)e->sector);
192                 /* strange behavior of some lower level drivers...
193                  * fail the request by clearing the uptodate flag,
194                  * but do not return any error?! */
195                 error = -EIO;
196         }
197
198         if (error)
199                 set_bit(__EE_WAS_ERROR, &e->flags);
200
201         bio_put(bio); /* no need for the bio anymore */
202         if (atomic_dec_and_test(&e->pending_bios)) {
203                 if (is_write)
204                         drbd_endio_write_sec_final(e);
205                 else
206                         drbd_endio_read_sec_final(e);
207         }
208 }
209
210 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
211  */
212 void drbd_endio_pri(struct bio *bio, int error)
213 {
214         unsigned long flags;
215         struct drbd_request *req = bio->bi_private;
216         struct drbd_conf *mdev = req->mdev;
217         struct bio_and_error m;
218         enum drbd_req_event what;
219         int uptodate = bio_flagged(bio, BIO_UPTODATE);
220
221         if (!error && !uptodate) {
222                 dev_warn(DEV, "p %s: setting error to -EIO\n",
223                          bio_data_dir(bio) == WRITE ? "write" : "read");
224                 /* strange behavior of some lower level drivers...
225                  * fail the request by clearing the uptodate flag,
226                  * but do not return any error?! */
227                 error = -EIO;
228         }
229
230         /* If this request was aborted locally before,
231          * but now was completed "successfully",
232          * chances are that this caused arbitrary data corruption.
233          *
234          * "aborting" requests, or force-detaching the disk, is intended for
235          * completely blocked/hung local backing devices which do no longer
236          * complete requests at all, not even do error completions.  In this
237          * situation, usually a hard-reset and failover is the only way out.
238          *
239          * By "aborting", basically faking a local error-completion,
240          * we allow for a more graceful swichover by cleanly migrating services.
241          * Still the affected node has to be rebooted "soon".
242          *
243          * By completing these requests, we allow the upper layers to re-use
244          * the associated data pages.
245          *
246          * If later the local backing device "recovers", and now DMAs some data
247          * from disk into the original request pages, in the best case it will
248          * just put random data into unused pages; but typically it will corrupt
249          * meanwhile completely unrelated data, causing all sorts of damage.
250          *
251          * Which means delayed successful completion,
252          * especially for READ requests,
253          * is a reason to panic().
254          *
255          * We assume that a delayed *error* completion is OK,
256          * though we still will complain noisily about it.
257          */
258         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
259                 if (__ratelimit(&drbd_ratelimit_state))
260                         dev_emerg(DEV, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
261
262                 if (!error)
263                         panic("possible random memory corruption caused by delayed completion of aborted local request\n");
264         }
265
266         /* to avoid recursion in __req_mod */
267         if (unlikely(error)) {
268                 what = (bio_data_dir(bio) == WRITE)
269                         ? write_completed_with_error
270                         : (bio_rw(bio) == READ)
271                           ? read_completed_with_error
272                           : read_ahead_completed_with_error;
273         } else
274                 what = completed_ok;
275
276         bio_put(req->private_bio);
277         req->private_bio = ERR_PTR(error);
278
279         /* not req_mod(), we need irqsave here! */
280         spin_lock_irqsave(&mdev->req_lock, flags);
281         __req_mod(req, what, &m);
282         spin_unlock_irqrestore(&mdev->req_lock, flags);
283         put_ldev(mdev);
284
285         if (m.bio)
286                 complete_master_bio(mdev, &m);
287 }
288
289 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
290 {
291         struct drbd_request *req = container_of(w, struct drbd_request, w);
292
293         /* We should not detach for read io-error,
294          * but try to WRITE the P_DATA_REPLY to the failed location,
295          * to give the disk the chance to relocate that block */
296
297         spin_lock_irq(&mdev->req_lock);
298         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
299                 _req_mod(req, read_retry_remote_canceled);
300                 spin_unlock_irq(&mdev->req_lock);
301                 return 1;
302         }
303         spin_unlock_irq(&mdev->req_lock);
304
305         return w_send_read_req(mdev, w, 0);
306 }
307
308 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
309 {
310         struct hash_desc desc;
311         struct scatterlist sg;
312         struct page *page = e->pages;
313         struct page *tmp;
314         unsigned len;
315
316         desc.tfm = tfm;
317         desc.flags = 0;
318
319         sg_init_table(&sg, 1);
320         crypto_hash_init(&desc);
321
322         while ((tmp = page_chain_next(page))) {
323                 /* all but the last page will be fully used */
324                 sg_set_page(&sg, page, PAGE_SIZE, 0);
325                 crypto_hash_update(&desc, &sg, sg.length);
326                 page = tmp;
327         }
328         /* and now the last, possibly only partially used page */
329         len = e->size & (PAGE_SIZE - 1);
330         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
331         crypto_hash_update(&desc, &sg, sg.length);
332         crypto_hash_final(&desc, digest);
333 }
334
335 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
336 {
337         struct hash_desc desc;
338         struct scatterlist sg;
339         struct bio_vec *bvec;
340         int i;
341
342         desc.tfm = tfm;
343         desc.flags = 0;
344
345         sg_init_table(&sg, 1);
346         crypto_hash_init(&desc);
347
348         bio_for_each_segment(bvec, bio, i) {
349                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
350                 crypto_hash_update(&desc, &sg, sg.length);
351         }
352         crypto_hash_final(&desc, digest);
353 }
354
355 /* TODO merge common code with w_e_end_ov_req */
356 int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
357 {
358         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
359         int digest_size;
360         void *digest;
361         int ok = 1;
362
363         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
364
365         if (unlikely(cancel))
366                 goto out;
367
368         if (likely((e->flags & EE_WAS_ERROR) != 0))
369                 goto out;
370
371         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
372         digest = kmalloc(digest_size, GFP_NOIO);
373         if (digest) {
374                 sector_t sector = e->sector;
375                 unsigned int size = e->size;
376                 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
377                 /* Free e and pages before send.
378                  * In case we block on congestion, we could otherwise run into
379                  * some distributed deadlock, if the other side blocks on
380                  * congestion as well, because our receiver blocks in
381                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
382                 drbd_free_ee(mdev, e);
383                 e = NULL;
384                 inc_rs_pending(mdev);
385                 ok = drbd_send_drequest_csum(mdev, sector, size,
386                                              digest, digest_size,
387                                              P_CSUM_RS_REQUEST);
388                 kfree(digest);
389         } else {
390                 dev_err(DEV, "kmalloc() of digest failed.\n");
391                 ok = 0;
392         }
393
394 out:
395         if (e)
396                 drbd_free_ee(mdev, e);
397
398         if (unlikely(!ok))
399                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
400         return ok;
401 }
402
403 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
404
405 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
406 {
407         struct drbd_epoch_entry *e;
408
409         if (!get_ldev(mdev))
410                 return -EIO;
411
412         if (drbd_rs_should_slow_down(mdev, sector))
413                 goto defer;
414
415         /* GFP_TRY, because if there is no memory available right now, this may
416          * be rescheduled for later. It is "only" background resync, after all. */
417         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
418         if (!e)
419                 goto defer;
420
421         e->w.cb = w_e_send_csum;
422         spin_lock_irq(&mdev->req_lock);
423         list_add(&e->w.list, &mdev->read_ee);
424         spin_unlock_irq(&mdev->req_lock);
425
426         atomic_add(size >> 9, &mdev->rs_sect_ev);
427         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
428                 return 0;
429
430         /* If it failed because of ENOMEM, retry should help.  If it failed
431          * because bio_add_page failed (probably broken lower level driver),
432          * retry may or may not help.
433          * If it does not, you may need to force disconnect. */
434         spin_lock_irq(&mdev->req_lock);
435         list_del(&e->w.list);
436         spin_unlock_irq(&mdev->req_lock);
437
438         drbd_free_ee(mdev, e);
439 defer:
440         put_ldev(mdev);
441         return -EAGAIN;
442 }
443
444 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
445 {
446         switch (mdev->state.conn) {
447         case C_VERIFY_S:
448                 w_make_ov_request(mdev, w, cancel);
449                 break;
450         case C_SYNC_TARGET:
451                 w_make_resync_request(mdev, w, cancel);
452                 break;
453         }
454
455         return 1;
456 }
457
458 void resync_timer_fn(unsigned long data)
459 {
460         struct drbd_conf *mdev = (struct drbd_conf *) data;
461
462         if (list_empty(&mdev->resync_work.list))
463                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
464 }
465
466 static void fifo_set(struct fifo_buffer *fb, int value)
467 {
468         int i;
469
470         for (i = 0; i < fb->size; i++)
471                 fb->values[i] = value;
472 }
473
474 static int fifo_push(struct fifo_buffer *fb, int value)
475 {
476         int ov;
477
478         ov = fb->values[fb->head_index];
479         fb->values[fb->head_index++] = value;
480
481         if (fb->head_index >= fb->size)
482                 fb->head_index = 0;
483
484         return ov;
485 }
486
487 static void fifo_add_val(struct fifo_buffer *fb, int value)
488 {
489         int i;
490
491         for (i = 0; i < fb->size; i++)
492                 fb->values[i] += value;
493 }
494
495 static int drbd_rs_controller(struct drbd_conf *mdev)
496 {
497         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
498         unsigned int want;     /* The number of sectors we want in the proxy */
499         int req_sect; /* Number of sectors to request in this turn */
500         int correction; /* Number of sectors more we need in the proxy*/
501         int cps; /* correction per invocation of drbd_rs_controller() */
502         int steps; /* Number of time steps to plan ahead */
503         int curr_corr;
504         int max_sect;
505
506         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
507         mdev->rs_in_flight -= sect_in;
508
509         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
510
511         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
512
513         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
514                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
515         } else { /* normal path */
516                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
517                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
518         }
519
520         correction = want - mdev->rs_in_flight - mdev->rs_planed;
521
522         /* Plan ahead */
523         cps = correction / steps;
524         fifo_add_val(&mdev->rs_plan_s, cps);
525         mdev->rs_planed += cps * steps;
526
527         /* What we do in this step */
528         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
529         spin_unlock(&mdev->peer_seq_lock);
530         mdev->rs_planed -= curr_corr;
531
532         req_sect = sect_in + curr_corr;
533         if (req_sect < 0)
534                 req_sect = 0;
535
536         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
537         if (req_sect > max_sect)
538                 req_sect = max_sect;
539
540         /*
541         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
542                  sect_in, mdev->rs_in_flight, want, correction,
543                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
544         */
545
546         return req_sect;
547 }
548
549 static int drbd_rs_number_requests(struct drbd_conf *mdev)
550 {
551         int number;
552         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
553                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
554                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
555         } else {
556                 mdev->c_sync_rate = mdev->sync_conf.rate;
557                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
558         }
559
560         /* ignore the amount of pending requests, the resync controller should
561          * throttle down to incoming reply rate soon enough anyways. */
562         return number;
563 }
564
565 static int w_make_resync_request(struct drbd_conf *mdev,
566                                  struct drbd_work *w, int cancel)
567 {
568         unsigned long bit;
569         sector_t sector;
570         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
571         int max_bio_size;
572         int number, rollback_i, size;
573         int align, queued, sndbuf;
574         int i = 0;
575
576         if (unlikely(cancel))
577                 return 1;
578
579         if (mdev->rs_total == 0) {
580                 /* empty resync? */
581                 drbd_resync_finished(mdev);
582                 return 1;
583         }
584
585         if (!get_ldev(mdev)) {
586                 /* Since we only need to access mdev->rsync a
587                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
588                    to continue resync with a broken disk makes no sense at
589                    all */
590                 dev_err(DEV, "Disk broke down during resync!\n");
591                 return 1;
592         }
593
594         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
595         number = drbd_rs_number_requests(mdev);
596         if (number == 0)
597                 goto requeue;
598
599         for (i = 0; i < number; i++) {
600                 /* Stop generating RS requests, when half of the send buffer is filled */
601                 mutex_lock(&mdev->data.mutex);
602                 if (mdev->data.socket) {
603                         queued = mdev->data.socket->sk->sk_wmem_queued;
604                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
605                 } else {
606                         queued = 1;
607                         sndbuf = 0;
608                 }
609                 mutex_unlock(&mdev->data.mutex);
610                 if (queued > sndbuf / 2)
611                         goto requeue;
612
613 next_sector:
614                 size = BM_BLOCK_SIZE;
615                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
616
617                 if (bit == DRBD_END_OF_BITMAP) {
618                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
619                         put_ldev(mdev);
620                         return 1;
621                 }
622
623                 sector = BM_BIT_TO_SECT(bit);
624
625                 if (drbd_rs_should_slow_down(mdev, sector) ||
626                     drbd_try_rs_begin_io(mdev, sector)) {
627                         mdev->bm_resync_fo = bit;
628                         goto requeue;
629                 }
630                 mdev->bm_resync_fo = bit + 1;
631
632                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
633                         drbd_rs_complete_io(mdev, sector);
634                         goto next_sector;
635                 }
636
637 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
638                 /* try to find some adjacent bits.
639                  * we stop if we have already the maximum req size.
640                  *
641                  * Additionally always align bigger requests, in order to
642                  * be prepared for all stripe sizes of software RAIDs.
643                  */
644                 align = 1;
645                 rollback_i = i;
646                 for (;;) {
647                         if (size + BM_BLOCK_SIZE > max_bio_size)
648                                 break;
649
650                         /* Be always aligned */
651                         if (sector & ((1<<(align+3))-1))
652                                 break;
653
654                         /* do not cross extent boundaries */
655                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
656                                 break;
657                         /* now, is it actually dirty, after all?
658                          * caution, drbd_bm_test_bit is tri-state for some
659                          * obscure reason; ( b == 0 ) would get the out-of-band
660                          * only accidentally right because of the "oddly sized"
661                          * adjustment below */
662                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
663                                 break;
664                         bit++;
665                         size += BM_BLOCK_SIZE;
666                         if ((BM_BLOCK_SIZE << align) <= size)
667                                 align++;
668                         i++;
669                 }
670                 /* if we merged some,
671                  * reset the offset to start the next drbd_bm_find_next from */
672                 if (size > BM_BLOCK_SIZE)
673                         mdev->bm_resync_fo = bit + 1;
674 #endif
675
676                 /* adjust very last sectors, in case we are oddly sized */
677                 if (sector + (size>>9) > capacity)
678                         size = (capacity-sector)<<9;
679                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
680                         switch (read_for_csum(mdev, sector, size)) {
681                         case -EIO: /* Disk failure */
682                                 put_ldev(mdev);
683                                 return 0;
684                         case -EAGAIN: /* allocation failed, or ldev busy */
685                                 drbd_rs_complete_io(mdev, sector);
686                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
687                                 i = rollback_i;
688                                 goto requeue;
689                         case 0:
690                                 /* everything ok */
691                                 break;
692                         default:
693                                 BUG();
694                         }
695                 } else {
696                         inc_rs_pending(mdev);
697                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
698                                                sector, size, ID_SYNCER)) {
699                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
700                                 dec_rs_pending(mdev);
701                                 put_ldev(mdev);
702                                 return 0;
703                         }
704                 }
705         }
706
707         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
708                 /* last syncer _request_ was sent,
709                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
710                  * next sync group will resume), as soon as we receive the last
711                  * resync data block, and the last bit is cleared.
712                  * until then resync "work" is "inactive" ...
713                  */
714                 put_ldev(mdev);
715                 return 1;
716         }
717
718  requeue:
719         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
720         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
721         put_ldev(mdev);
722         return 1;
723 }
724
725 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
726 {
727         int number, i, size;
728         sector_t sector;
729         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
730         bool stop_sector_reached = false;
731
732         if (unlikely(cancel))
733                 return 1;
734
735         number = drbd_rs_number_requests(mdev);
736
737         sector = mdev->ov_position;
738         for (i = 0; i < number; i++) {
739                 if (sector >= capacity)
740                         return 1;
741
742                 /* We check for "finished" only in the reply path:
743                  * w_e_end_ov_reply().
744                  * We need to send at least one request out. */
745                 stop_sector_reached = i > 0
746                         && mdev->agreed_pro_version >= 97
747                         && sector >= mdev->ov_stop_sector;
748                 if (stop_sector_reached)
749                         break;
750
751                 size = BM_BLOCK_SIZE;
752
753                 if (drbd_rs_should_slow_down(mdev, sector) ||
754                     drbd_try_rs_begin_io(mdev, sector)) {
755                         mdev->ov_position = sector;
756                         goto requeue;
757                 }
758
759                 if (sector + (size>>9) > capacity)
760                         size = (capacity-sector)<<9;
761
762                 inc_rs_pending(mdev);
763                 if (!drbd_send_ov_request(mdev, sector, size)) {
764                         dec_rs_pending(mdev);
765                         return 0;
766                 }
767                 sector += BM_SECT_PER_BIT;
768         }
769         mdev->ov_position = sector;
770
771  requeue:
772         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
773         if (i == 0 || !stop_sector_reached)
774                 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
775         return 1;
776 }
777
778
779 void start_resync_timer_fn(unsigned long data)
780 {
781         struct drbd_conf *mdev = (struct drbd_conf *) data;
782
783         drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
784 }
785
786 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
787 {
788         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
789                 dev_warn(DEV, "w_start_resync later...\n");
790                 mdev->start_resync_timer.expires = jiffies + HZ/10;
791                 add_timer(&mdev->start_resync_timer);
792                 return 1;
793         }
794
795         drbd_start_resync(mdev, C_SYNC_SOURCE);
796         drbd_clear_flag(mdev, AHEAD_TO_SYNC_SOURCE);
797         return 1;
798 }
799
800 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
801 {
802         kfree(w);
803         ov_oos_print(mdev);
804         drbd_resync_finished(mdev);
805
806         return 1;
807 }
808
809 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
810 {
811         kfree(w);
812
813         drbd_resync_finished(mdev);
814
815         return 1;
816 }
817
818 static void ping_peer(struct drbd_conf *mdev)
819 {
820         drbd_clear_flag(mdev, GOT_PING_ACK);
821         request_ping(mdev);
822         wait_event(mdev->misc_wait,
823                    drbd_test_flag(mdev, GOT_PING_ACK) || mdev->state.conn < C_CONNECTED);
824 }
825
826 int drbd_resync_finished(struct drbd_conf *mdev)
827 {
828         unsigned long db, dt, dbdt;
829         unsigned long n_oos;
830         union drbd_state os, ns;
831         struct drbd_work *w;
832         char *khelper_cmd = NULL;
833         int verify_done = 0;
834
835         /* Remove all elements from the resync LRU. Since future actions
836          * might set bits in the (main) bitmap, then the entries in the
837          * resync LRU would be wrong. */
838         if (drbd_rs_del_all(mdev)) {
839                 /* In case this is not possible now, most probably because
840                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
841                  * queue (or even the read operations for those packets
842                  * is not finished by now).   Retry in 100ms. */
843
844                 schedule_timeout_interruptible(HZ / 10);
845                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
846                 if (w) {
847                         w->cb = w_resync_finished;
848                         drbd_queue_work(&mdev->data.work, w);
849                         return 1;
850                 }
851                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
852         }
853
854         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
855         if (dt <= 0)
856                 dt = 1;
857         
858         db = mdev->rs_total;
859         /* adjust for verify start and stop sectors, respective reached position */
860         if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
861                 db -= mdev->ov_left;
862
863         dbdt = Bit2KB(db/dt);
864         mdev->rs_paused /= HZ;
865
866         if (!get_ldev(mdev))
867                 goto out;
868
869         ping_peer(mdev);
870
871         spin_lock_irq(&mdev->req_lock);
872         os = mdev->state;
873
874         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
875
876         /* This protects us against multiple calls (that can happen in the presence
877            of application IO), and against connectivity loss just before we arrive here. */
878         if (os.conn <= C_CONNECTED)
879                 goto out_unlock;
880
881         ns = os;
882         ns.conn = C_CONNECTED;
883
884         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
885              verify_done ? "Online verify" : "Resync",
886              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
887
888         n_oos = drbd_bm_total_weight(mdev);
889
890         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
891                 if (n_oos) {
892                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
893                               n_oos, Bit2KB(1));
894                         khelper_cmd = "out-of-sync";
895                 }
896         } else {
897                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
898
899                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
900                         khelper_cmd = "after-resync-target";
901
902                 if (mdev->csums_tfm && mdev->rs_total) {
903                         const unsigned long s = mdev->rs_same_csum;
904                         const unsigned long t = mdev->rs_total;
905                         const int ratio =
906                                 (t == 0)     ? 0 :
907                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
908                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
909                              "transferred %luK total %luK\n",
910                              ratio,
911                              Bit2KB(mdev->rs_same_csum),
912                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
913                              Bit2KB(mdev->rs_total));
914                 }
915         }
916
917         if (mdev->rs_failed) {
918                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
919
920                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
921                         ns.disk = D_INCONSISTENT;
922                         ns.pdsk = D_UP_TO_DATE;
923                 } else {
924                         ns.disk = D_UP_TO_DATE;
925                         ns.pdsk = D_INCONSISTENT;
926                 }
927         } else {
928                 ns.disk = D_UP_TO_DATE;
929                 ns.pdsk = D_UP_TO_DATE;
930
931                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
932                         if (mdev->p_uuid) {
933                                 int i;
934                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
935                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
936                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
937                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
938                         } else {
939                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
940                         }
941                 }
942
943                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
944                         /* for verify runs, we don't update uuids here,
945                          * so there would be nothing to report. */
946                         drbd_uuid_set_bm(mdev, 0UL);
947                         drbd_print_uuids(mdev, "updated UUIDs");
948                         if (mdev->p_uuid) {
949                                 /* Now the two UUID sets are equal, update what we
950                                  * know of the peer. */
951                                 int i;
952                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
953                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
954                         }
955                 }
956         }
957
958         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
959 out_unlock:
960         spin_unlock_irq(&mdev->req_lock);
961         put_ldev(mdev);
962 out:
963         mdev->rs_total  = 0;
964         mdev->rs_failed = 0;
965         mdev->rs_paused = 0;
966
967         /* reset start sector, if we reached end of device */
968         if (verify_done && mdev->ov_left == 0)
969                 mdev->ov_start_sector = 0;
970
971         drbd_md_sync(mdev);
972
973         if (khelper_cmd)
974                 drbd_khelper(mdev, khelper_cmd);
975
976         return 1;
977 }
978
979 /* helper */
980 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
981 {
982         if (drbd_ee_has_active_page(e)) {
983                 /* This might happen if sendpage() has not finished */
984                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
985                 atomic_add(i, &mdev->pp_in_use_by_net);
986                 atomic_sub(i, &mdev->pp_in_use);
987                 spin_lock_irq(&mdev->req_lock);
988                 list_add_tail(&e->w.list, &mdev->net_ee);
989                 spin_unlock_irq(&mdev->req_lock);
990                 wake_up(&drbd_pp_wait);
991         } else
992                 drbd_free_ee(mdev, e);
993 }
994
995 /**
996  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
997  * @mdev:       DRBD device.
998  * @w:          work object.
999  * @cancel:     The connection will be closed anyways
1000  */
1001 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1002 {
1003         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1004         int ok;
1005
1006         if (unlikely(cancel)) {
1007                 drbd_free_ee(mdev, e);
1008                 dec_unacked(mdev);
1009                 return 1;
1010         }
1011
1012         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1013                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
1014         } else {
1015                 if (__ratelimit(&drbd_ratelimit_state))
1016                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
1017                             (unsigned long long)e->sector);
1018
1019                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
1020         }
1021
1022         dec_unacked(mdev);
1023
1024         move_to_net_ee_or_free(mdev, e);
1025
1026         if (unlikely(!ok))
1027                 dev_err(DEV, "drbd_send_block() failed\n");
1028         return ok;
1029 }
1030
1031 /**
1032  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
1033  * @mdev:       DRBD device.
1034  * @w:          work object.
1035  * @cancel:     The connection will be closed anyways
1036  */
1037 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1038 {
1039         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1040         int ok;
1041
1042         if (unlikely(cancel)) {
1043                 drbd_free_ee(mdev, e);
1044                 dec_unacked(mdev);
1045                 return 1;
1046         }
1047
1048         if (get_ldev_if_state(mdev, D_FAILED)) {
1049                 drbd_rs_complete_io(mdev, e->sector);
1050                 put_ldev(mdev);
1051         }
1052
1053         if (mdev->state.conn == C_AHEAD) {
1054                 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
1055         } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1056                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1057                         inc_rs_pending(mdev);
1058                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1059                 } else {
1060                         if (__ratelimit(&drbd_ratelimit_state))
1061                                 dev_err(DEV, "Not sending RSDataReply, "
1062                                     "partner DISKLESS!\n");
1063                         ok = 1;
1064                 }
1065         } else {
1066                 if (__ratelimit(&drbd_ratelimit_state))
1067                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1068                             (unsigned long long)e->sector);
1069
1070                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1071
1072                 /* update resync data with failure */
1073                 drbd_rs_failed_io(mdev, e->sector, e->size);
1074         }
1075
1076         dec_unacked(mdev);
1077
1078         move_to_net_ee_or_free(mdev, e);
1079
1080         if (unlikely(!ok))
1081                 dev_err(DEV, "drbd_send_block() failed\n");
1082         return ok;
1083 }
1084
1085 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 {
1087         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1088         struct digest_info *di;
1089         int digest_size;
1090         void *digest = NULL;
1091         int ok, eq = 0;
1092
1093         if (unlikely(cancel)) {
1094                 drbd_free_ee(mdev, e);
1095                 dec_unacked(mdev);
1096                 return 1;
1097         }
1098
1099         if (get_ldev(mdev)) {
1100                 drbd_rs_complete_io(mdev, e->sector);
1101                 put_ldev(mdev);
1102         }
1103
1104         di = e->digest;
1105
1106         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1107                 /* quick hack to try to avoid a race against reconfiguration.
1108                  * a real fix would be much more involved,
1109                  * introducing more locking mechanisms */
1110                 if (mdev->csums_tfm) {
1111                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1112                         D_ASSERT(digest_size == di->digest_size);
1113                         digest = kmalloc(digest_size, GFP_NOIO);
1114                 }
1115                 if (digest) {
1116                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1117                         eq = !memcmp(digest, di->digest, digest_size);
1118                         kfree(digest);
1119                 }
1120
1121                 if (eq) {
1122                         drbd_set_in_sync(mdev, e->sector, e->size);
1123                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1124                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1125                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1126                 } else {
1127                         inc_rs_pending(mdev);
1128                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1129                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1130                         kfree(di);
1131                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1132                 }
1133         } else {
1134                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1135                 if (__ratelimit(&drbd_ratelimit_state))
1136                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1137         }
1138
1139         dec_unacked(mdev);
1140         move_to_net_ee_or_free(mdev, e);
1141
1142         if (unlikely(!ok))
1143                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1144         return ok;
1145 }
1146
1147 /* TODO merge common code with w_e_send_csum */
1148 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1149 {
1150         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1151         sector_t sector = e->sector;
1152         unsigned int size = e->size;
1153         int digest_size;
1154         void *digest;
1155         int ok = 1;
1156
1157         if (unlikely(cancel))
1158                 goto out;
1159
1160         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1161         digest = kmalloc(digest_size, GFP_NOIO);
1162         if (!digest) {
1163                 ok = 0; /* terminate the connection in case the allocation failed */
1164                 goto out;
1165         }
1166
1167         if (likely(!(e->flags & EE_WAS_ERROR)))
1168                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1169         else
1170                 memset(digest, 0, digest_size);
1171
1172         /* Free e and pages before send.
1173          * In case we block on congestion, we could otherwise run into
1174          * some distributed deadlock, if the other side blocks on
1175          * congestion as well, because our receiver blocks in
1176          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1177         drbd_free_ee(mdev, e);
1178         e = NULL;
1179         inc_rs_pending(mdev);
1180         ok = drbd_send_drequest_csum(mdev, sector, size,
1181                                      digest, digest_size,
1182                                      P_OV_REPLY);
1183         if (!ok)
1184                 dec_rs_pending(mdev);
1185         kfree(digest);
1186
1187 out:
1188         if (e)
1189                 drbd_free_ee(mdev, e);
1190         dec_unacked(mdev);
1191         return ok;
1192 }
1193
1194 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1195 {
1196         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1197                 mdev->ov_last_oos_size += size>>9;
1198         } else {
1199                 mdev->ov_last_oos_start = sector;
1200                 mdev->ov_last_oos_size = size>>9;
1201         }
1202         drbd_set_out_of_sync(mdev, sector, size);
1203 }
1204
1205 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1206 {
1207         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1208         struct digest_info *di;
1209         void *digest;
1210         sector_t sector = e->sector;
1211         unsigned int size = e->size;
1212         int digest_size;
1213         int ok, eq = 0;
1214         bool stop_sector_reached = false;
1215
1216         if (unlikely(cancel)) {
1217                 drbd_free_ee(mdev, e);
1218                 dec_unacked(mdev);
1219                 return 1;
1220         }
1221
1222         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1223          * the resync lru has been cleaned up already */
1224         if (get_ldev(mdev)) {
1225                 drbd_rs_complete_io(mdev, e->sector);
1226                 put_ldev(mdev);
1227         }
1228
1229         di = e->digest;
1230
1231         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1232                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1233                 digest = kmalloc(digest_size, GFP_NOIO);
1234                 if (digest) {
1235                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1236
1237                         D_ASSERT(digest_size == di->digest_size);
1238                         eq = !memcmp(digest, di->digest, digest_size);
1239                         kfree(digest);
1240                 }
1241         }
1242
1243                 /* Free e and pages before send.
1244                  * In case we block on congestion, we could otherwise run into
1245                  * some distributed deadlock, if the other side blocks on
1246                  * congestion as well, because our receiver blocks in
1247                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
1248         drbd_free_ee(mdev, e);
1249         if (!eq)
1250                 drbd_ov_oos_found(mdev, sector, size);
1251         else
1252                 ov_oos_print(mdev);
1253
1254         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1255                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1256
1257         dec_unacked(mdev);
1258
1259         --mdev->ov_left;
1260
1261         /* let's advance progress step marks only for every other megabyte */
1262         if ((mdev->ov_left & 0x200) == 0x200)
1263                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1264
1265         stop_sector_reached = mdev->agreed_pro_version >= 97 &&
1266                 (sector + (size>>9)) >= mdev->ov_stop_sector;
1267
1268         if (mdev->ov_left == 0 || stop_sector_reached) {
1269                 ov_oos_print(mdev);
1270                 drbd_resync_finished(mdev);
1271         }
1272
1273         return ok;
1274 }
1275
1276 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1277 {
1278         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1279         complete(&b->done);
1280         return 1;
1281 }
1282
1283 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1284 {
1285         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1286         struct p_barrier *p = &mdev->data.sbuf.barrier;
1287         int ok = 1;
1288
1289         /* really avoid racing with tl_clear.  w.cb may have been referenced
1290          * just before it was reassigned and re-queued, so double check that.
1291          * actually, this race was harmless, since we only try to send the
1292          * barrier packet here, and otherwise do nothing with the object.
1293          * but compare with the head of w_clear_epoch */
1294         spin_lock_irq(&mdev->req_lock);
1295         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1296                 cancel = 1;
1297         spin_unlock_irq(&mdev->req_lock);
1298         if (cancel)
1299                 return 1;
1300
1301         if (!drbd_get_data_sock(mdev))
1302                 return 0;
1303         p->barrier = b->br_number;
1304         /* inc_ap_pending was done where this was queued.
1305          * dec_ap_pending will be done in got_BarrierAck
1306          * or (on connection loss) in w_clear_epoch.  */
1307         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1308                                 (struct p_header80 *)p, sizeof(*p), 0);
1309         drbd_put_data_sock(mdev);
1310
1311         return ok;
1312 }
1313
1314 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1315 {
1316         if (cancel)
1317                 return 1;
1318         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1319 }
1320
1321 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1322 {
1323         struct drbd_request *req = container_of(w, struct drbd_request, w);
1324         int ok;
1325
1326         if (unlikely(cancel)) {
1327                 req_mod(req, send_canceled);
1328                 return 1;
1329         }
1330
1331         ok = drbd_send_oos(mdev, req);
1332         req_mod(req, oos_handed_to_network);
1333
1334         return ok;
1335 }
1336
1337 /**
1338  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1339  * @mdev:       DRBD device.
1340  * @w:          work object.
1341  * @cancel:     The connection will be closed anyways
1342  */
1343 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1344 {
1345         struct drbd_request *req = container_of(w, struct drbd_request, w);
1346         int ok;
1347
1348         if (unlikely(cancel)) {
1349                 req_mod(req, send_canceled);
1350                 return 1;
1351         }
1352
1353         ok = drbd_send_dblock(mdev, req);
1354         req_mod(req, ok ? handed_over_to_network : send_failed);
1355
1356         return ok;
1357 }
1358
1359 /**
1360  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1361  * @mdev:       DRBD device.
1362  * @w:          work object.
1363  * @cancel:     The connection will be closed anyways
1364  */
1365 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1366 {
1367         struct drbd_request *req = container_of(w, struct drbd_request, w);
1368         int ok;
1369
1370         if (unlikely(cancel)) {
1371                 req_mod(req, send_canceled);
1372                 return 1;
1373         }
1374
1375         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1376                                 (unsigned long)req);
1377
1378         if (!ok) {
1379                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1380                  * so this is probably redundant */
1381                 if (mdev->state.conn >= C_CONNECTED)
1382                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1383         }
1384         req_mod(req, ok ? handed_over_to_network : send_failed);
1385
1386         return ok;
1387 }
1388
1389 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1390 {
1391         struct drbd_request *req = container_of(w, struct drbd_request, w);
1392
1393         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1394                 drbd_al_begin_io(mdev, req->sector);
1395         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1396            theoretically. Practically it can not deadlock, since this is
1397            only used when unfreezing IOs. All the extents of the requests
1398            that made it into the TL are already active */
1399
1400         drbd_req_make_private_bio(req, req->master_bio);
1401         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1402         generic_make_request(req->private_bio);
1403
1404         return 1;
1405 }
1406
1407 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1408 {
1409         struct drbd_conf *odev = mdev;
1410
1411         while (1) {
1412                 if (odev->sync_conf.after == -1)
1413                         return 1;
1414                 odev = minor_to_mdev(odev->sync_conf.after);
1415                 ERR_IF(!odev) return 1;
1416                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1417                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1418                     odev->state.aftr_isp || odev->state.peer_isp ||
1419                     odev->state.user_isp)
1420                         return 0;
1421         }
1422 }
1423
1424 /**
1425  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1426  * @mdev:       DRBD device.
1427  *
1428  * Called from process context only (admin command and after_state_ch).
1429  */
1430 static int _drbd_pause_after(struct drbd_conf *mdev)
1431 {
1432         struct drbd_conf *odev;
1433         int i, rv = 0;
1434
1435         for (i = 0; i < minor_count; i++) {
1436                 odev = minor_to_mdev(i);
1437                 if (!odev)
1438                         continue;
1439                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1440                         continue;
1441                 if (!_drbd_may_sync_now(odev))
1442                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1443                                != SS_NOTHING_TO_DO);
1444         }
1445
1446         return rv;
1447 }
1448
1449 /**
1450  * _drbd_resume_next() - Resume resync on all devices that may resync now
1451  * @mdev:       DRBD device.
1452  *
1453  * Called from process context only (admin command and worker).
1454  */
1455 static int _drbd_resume_next(struct drbd_conf *mdev)
1456 {
1457         struct drbd_conf *odev;
1458         int i, rv = 0;
1459
1460         for (i = 0; i < minor_count; i++) {
1461                 odev = minor_to_mdev(i);
1462                 if (!odev)
1463                         continue;
1464                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1465                         continue;
1466                 if (odev->state.aftr_isp) {
1467                         if (_drbd_may_sync_now(odev))
1468                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1469                                                         CS_HARD, NULL)
1470                                        != SS_NOTHING_TO_DO) ;
1471                 }
1472         }
1473         return rv;
1474 }
1475
1476 void resume_next_sg(struct drbd_conf *mdev)
1477 {
1478         write_lock_irq(&global_state_lock);
1479         _drbd_resume_next(mdev);
1480         write_unlock_irq(&global_state_lock);
1481 }
1482
1483 void suspend_other_sg(struct drbd_conf *mdev)
1484 {
1485         write_lock_irq(&global_state_lock);
1486         _drbd_pause_after(mdev);
1487         write_unlock_irq(&global_state_lock);
1488 }
1489
1490 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1491 {
1492         struct drbd_conf *odev;
1493
1494         if (o_minor == -1)
1495                 return NO_ERROR;
1496         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1497                 return ERR_SYNC_AFTER;
1498
1499         /* check for loops */
1500         odev = minor_to_mdev(o_minor);
1501         while (1) {
1502                 if (odev == mdev)
1503                         return ERR_SYNC_AFTER_CYCLE;
1504
1505                 /* dependency chain ends here, no cycles. */
1506                 if (odev->sync_conf.after == -1)
1507                         return NO_ERROR;
1508
1509                 /* follow the dependency chain */
1510                 odev = minor_to_mdev(odev->sync_conf.after);
1511         }
1512 }
1513
1514 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1515 {
1516         int changes;
1517         int retcode;
1518
1519         write_lock_irq(&global_state_lock);
1520         retcode = sync_after_error(mdev, na);
1521         if (retcode == NO_ERROR) {
1522                 mdev->sync_conf.after = na;
1523                 do {
1524                         changes  = _drbd_pause_after(mdev);
1525                         changes |= _drbd_resume_next(mdev);
1526                 } while (changes);
1527         }
1528         write_unlock_irq(&global_state_lock);
1529         return retcode;
1530 }
1531
1532 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1533 {
1534         atomic_set(&mdev->rs_sect_in, 0);
1535         atomic_set(&mdev->rs_sect_ev, 0);
1536         mdev->rs_in_flight = 0;
1537         mdev->rs_planed = 0;
1538         spin_lock(&mdev->peer_seq_lock);
1539         fifo_set(&mdev->rs_plan_s, 0);
1540         spin_unlock(&mdev->peer_seq_lock);
1541 }
1542
1543 /**
1544  * drbd_start_resync() - Start the resync process
1545  * @mdev:       DRBD device.
1546  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1547  *
1548  * This function might bring you directly into one of the
1549  * C_PAUSED_SYNC_* states.
1550  */
1551 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1552 {
1553         union drbd_state ns;
1554         int r;
1555
1556         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1557                 dev_err(DEV, "Resync already running!\n");
1558                 return;
1559         }
1560
1561         if (side == C_SYNC_TARGET) {
1562                 /* Since application IO was locked out during C_WF_BITMAP_T and
1563                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1564                    we check that we might make the data inconsistent. */
1565                 r = drbd_khelper(mdev, "before-resync-target");
1566                 r = (r >> 8) & 0xff;
1567                 if (r > 0) {
1568                         dev_info(DEV, "before-resync-target handler returned %d, "
1569                              "dropping connection.\n", r);
1570                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1571                         return;
1572                 }
1573         } else /* C_SYNC_SOURCE */ {
1574                 r = drbd_khelper(mdev, "before-resync-source");
1575                 r = (r >> 8) & 0xff;
1576                 if (r > 0) {
1577                         if (r == 3) {
1578                                 dev_info(DEV, "before-resync-source handler returned %d, "
1579                                          "ignoring. Old userland tools?", r);
1580                         } else {
1581                                 dev_info(DEV, "before-resync-source handler returned %d, "
1582                                          "dropping connection.\n", r);
1583                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1584                                 return;
1585                         }
1586                 }
1587         }
1588
1589         drbd_state_lock(mdev);
1590         write_lock_irq(&global_state_lock);
1591         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1592                 write_unlock_irq(&global_state_lock);
1593                 drbd_state_unlock(mdev);
1594                 return;
1595         }
1596
1597         ns.i = mdev->state.i;
1598
1599         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1600
1601         ns.conn = side;
1602
1603         if (side == C_SYNC_TARGET)
1604                 ns.disk = D_INCONSISTENT;
1605         else /* side == C_SYNC_SOURCE */
1606                 ns.pdsk = D_INCONSISTENT;
1607
1608         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1609         ns = mdev->state;
1610
1611         if (ns.conn < C_CONNECTED)
1612                 r = SS_UNKNOWN_ERROR;
1613
1614         if (r == SS_SUCCESS) {
1615                 unsigned long tw = drbd_bm_total_weight(mdev);
1616                 unsigned long now = jiffies;
1617                 int i;
1618
1619                 mdev->rs_failed    = 0;
1620                 mdev->rs_paused    = 0;
1621                 mdev->rs_same_csum = 0;
1622                 mdev->rs_last_events = 0;
1623                 mdev->rs_last_sect_ev = 0;
1624                 mdev->rs_total     = tw;
1625                 mdev->rs_start     = now;
1626                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1627                         mdev->rs_mark_left[i] = tw;
1628                         mdev->rs_mark_time[i] = now;
1629                 }
1630                 _drbd_pause_after(mdev);
1631         }
1632         write_unlock_irq(&global_state_lock);
1633
1634         if (r == SS_SUCCESS) {
1635                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1636                      drbd_conn_str(ns.conn),
1637                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1638                      (unsigned long) mdev->rs_total);
1639                 if (side == C_SYNC_TARGET)
1640                         mdev->bm_resync_fo = 0;
1641
1642                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1643                  * with w_send_oos, or the sync target will get confused as to
1644                  * how much bits to resync.  We cannot do that always, because for an
1645                  * empty resync and protocol < 95, we need to do it here, as we call
1646                  * drbd_resync_finished from here in that case.
1647                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1648                  * and from after_state_ch otherwise. */
1649                 if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1650                         drbd_gen_and_send_sync_uuid(mdev);
1651
1652                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1653                         /* This still has a race (about when exactly the peers
1654                          * detect connection loss) that can lead to a full sync
1655                          * on next handshake. In 8.3.9 we fixed this with explicit
1656                          * resync-finished notifications, but the fix
1657                          * introduces a protocol change.  Sleeping for some
1658                          * time longer than the ping interval + timeout on the
1659                          * SyncSource, to give the SyncTarget the chance to
1660                          * detect connection loss, then waiting for a ping
1661                          * response (implicit in drbd_resync_finished) reduces
1662                          * the race considerably, but does not solve it. */
1663                         if (side == C_SYNC_SOURCE)
1664                                 schedule_timeout_interruptible(
1665                                         mdev->net_conf->ping_int * HZ +
1666                                         mdev->net_conf->ping_timeo*HZ/9);
1667                         drbd_resync_finished(mdev);
1668                 }
1669
1670                 drbd_rs_controller_reset(mdev);
1671                 /* ns.conn may already be != mdev->state.conn,
1672                  * we may have been paused in between, or become paused until
1673                  * the timer triggers.
1674                  * No matter, that is handled in resync_timer_fn() */
1675                 if (ns.conn == C_SYNC_TARGET)
1676                         mod_timer(&mdev->resync_timer, jiffies);
1677
1678                 drbd_md_sync(mdev);
1679         }
1680         put_ldev(mdev);
1681         drbd_state_unlock(mdev);
1682 }
1683
1684 int drbd_worker(struct drbd_thread *thi)
1685 {
1686         struct drbd_conf *mdev = thi->mdev;
1687         struct drbd_work *w = NULL;
1688         LIST_HEAD(work_list);
1689         int intr = 0, i;
1690
1691         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1692
1693         while (get_t_state(thi) == Running) {
1694                 drbd_thread_current_set_cpu(mdev);
1695
1696                 if (down_trylock(&mdev->data.work.s)) {
1697                         mutex_lock(&mdev->data.mutex);
1698                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1699                                 drbd_tcp_uncork(mdev->data.socket);
1700                         mutex_unlock(&mdev->data.mutex);
1701
1702                         intr = down_interruptible(&mdev->data.work.s);
1703
1704                         mutex_lock(&mdev->data.mutex);
1705                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1706                                 drbd_tcp_cork(mdev->data.socket);
1707                         mutex_unlock(&mdev->data.mutex);
1708                 }
1709
1710                 if (intr) {
1711                         D_ASSERT(intr == -EINTR);
1712                         flush_signals(current);
1713                         ERR_IF (get_t_state(thi) == Running)
1714                                 continue;
1715                         break;
1716                 }
1717
1718                 if (get_t_state(thi) != Running)
1719                         break;
1720                 /* With this break, we have done a down() but not consumed
1721                    the entry from the list. The cleanup code takes care of
1722                    this...   */
1723
1724                 w = NULL;
1725                 spin_lock_irq(&mdev->data.work.q_lock);
1726                 ERR_IF(list_empty(&mdev->data.work.q)) {
1727                         /* something terribly wrong in our logic.
1728                          * we were able to down() the semaphore,
1729                          * but the list is empty... doh.
1730                          *
1731                          * what is the best thing to do now?
1732                          * try again from scratch, restarting the receiver,
1733                          * asender, whatnot? could break even more ugly,
1734                          * e.g. when we are primary, but no good local data.
1735                          *
1736                          * I'll try to get away just starting over this loop.
1737                          */
1738                         spin_unlock_irq(&mdev->data.work.q_lock);
1739                         continue;
1740                 }
1741                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1742                 list_del_init(&w->list);
1743                 spin_unlock_irq(&mdev->data.work.q_lock);
1744
1745                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1746                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1747                         if (mdev->state.conn >= C_CONNECTED)
1748                                 drbd_force_state(mdev,
1749                                                 NS(conn, C_NETWORK_FAILURE));
1750                 }
1751         }
1752         D_ASSERT(drbd_test_flag(mdev, DEVICE_DYING));
1753         D_ASSERT(drbd_test_flag(mdev, CONFIG_PENDING));
1754
1755         spin_lock_irq(&mdev->data.work.q_lock);
1756         i = 0;
1757         while (!list_empty(&mdev->data.work.q)) {
1758                 list_splice_init(&mdev->data.work.q, &work_list);
1759                 spin_unlock_irq(&mdev->data.work.q_lock);
1760
1761                 while (!list_empty(&work_list)) {
1762                         w = list_entry(work_list.next, struct drbd_work, list);
1763                         list_del_init(&w->list);
1764                         w->cb(mdev, w, 1);
1765                         i++; /* dead debugging code */
1766                 }
1767
1768                 spin_lock_irq(&mdev->data.work.q_lock);
1769         }
1770         sema_init(&mdev->data.work.s, 0);
1771         /* DANGEROUS race: if someone did queue his work within the spinlock,
1772          * but up() ed outside the spinlock, we could get an up() on the
1773          * semaphore without corresponding list entry.
1774          * So don't do that.
1775          */
1776         spin_unlock_irq(&mdev->data.work.q_lock);
1777
1778         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1779         /* _drbd_set_state only uses stop_nowait.
1780          * wait here for the Exiting receiver. */
1781         drbd_thread_stop(&mdev->receiver);
1782         drbd_mdev_cleanup(mdev);
1783
1784         dev_info(DEV, "worker terminated\n");
1785
1786         drbd_clear_flag(mdev, DEVICE_DYING);
1787         drbd_clear_flag(mdev, CONFIG_PENDING);
1788         wake_up(&mdev->state_wait);
1789
1790         return 0;
1791 }