drbd: Turn tl_apply() into tl_abort_disk_io()
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218         INIT_LIST_HEAD(&tconn->barrier_acked_requests);
219
220         return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225         if (tconn->oldest_tle != tconn->newest_tle)
226                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227         if (!list_empty(&tconn->out_of_sequence_requests))
228                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229         kfree(tconn->oldest_tle);
230         tconn->oldest_tle = NULL;
231         kfree(tconn->unused_spare_tle);
232         tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236  * _tl_add_barrier() - Adds a barrier to the transfer log
237  * @mdev:       DRBD device.
238  * @new:        Barrier to be added before the current head of the TL.
239  *
240  * The caller must hold the req_lock.
241  */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244         struct drbd_tl_epoch *newest_before;
245
246         INIT_LIST_HEAD(&new->requests);
247         INIT_LIST_HEAD(&new->w.list);
248         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249         new->next = NULL;
250         new->n_writes = 0;
251
252         newest_before = tconn->newest_tle;
253         /* never send a barrier number == 0, because that is special-cased
254          * when using TCQ for our write ordering code */
255         new->br_number = (newest_before->br_number+1) ?: 1;
256         if (tconn->newest_tle != new) {
257                 tconn->newest_tle->next = new;
258                 tconn->newest_tle = new;
259         }
260 }
261
262 /**
263  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264  * @mdev:       DRBD device.
265  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266  * @set_size:   Expected number of requests before that barrier.
267  *
268  * In case the passed barrier_nr or set_size does not match the oldest
269  * &struct drbd_tl_epoch objects this function will cause a termination
270  * of the connection.
271  */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273                 unsigned int set_size)
274 {
275         struct drbd_conf *mdev;
276         struct drbd_tl_epoch *b, *nob; /* next old barrier */
277         struct list_head *le, *tle;
278         struct drbd_request *r;
279
280         spin_lock_irq(&tconn->req_lock);
281
282         b = tconn->oldest_tle;
283
284         /* first some paranoia code */
285         if (b == NULL) {
286                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287                          barrier_nr);
288                 goto bail;
289         }
290         if (b->br_number != barrier_nr) {
291                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292                          barrier_nr, b->br_number);
293                 goto bail;
294         }
295         if (b->n_writes != set_size) {
296                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297                          barrier_nr, set_size, b->n_writes);
298                 goto bail;
299         }
300
301         /* Clean up list of requests processed during current epoch */
302         list_for_each_safe(le, tle, &b->requests) {
303                 r = list_entry(le, struct drbd_request, tl_requests);
304                 _req_mod(r, BARRIER_ACKED);
305         }
306         /* There could be requests on the list waiting for completion
307            of the write to the local disk. To avoid corruptions of
308            slab's data structures we have to remove the lists head.
309
310            Also there could have been a barrier ack out of sequence, overtaking
311            the write acks - which would be a bug and violating write ordering.
312            To not deadlock in case we lose connection while such requests are
313            still pending, we need some way to find them for the
314            _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316            These have been list_move'd to the out_of_sequence_requests list in
317            _req_mod(, BARRIER_ACKED) above.
318            */
319         list_splice_init(&b->requests, &tconn->barrier_acked_requests);
320         mdev = b->w.mdev;
321
322         nob = b->next;
323         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324                 _tl_add_barrier(tconn, b);
325                 if (nob)
326                         tconn->oldest_tle = nob;
327                 /* if nob == NULL b was the only barrier, and becomes the new
328                    barrier. Therefore tconn->oldest_tle points already to b */
329         } else {
330                 D_ASSERT(nob != NULL);
331                 tconn->oldest_tle = nob;
332                 kfree(b);
333         }
334
335         spin_unlock_irq(&tconn->req_lock);
336         dec_ap_pending(mdev);
337
338         return;
339
340 bail:
341         spin_unlock_irq(&tconn->req_lock);
342         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347  * _tl_restart() - Walks the transfer log, and applies an action to all requests
348  * @mdev:       DRBD device.
349  * @what:       The action/event to perform with all request objects
350  *
351  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352  * RESTART_FROZEN_DISK_IO.
353  */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356         struct drbd_tl_epoch *b, *tmp, **pn;
357         struct list_head *le, *tle, carry_reads;
358         struct drbd_request *req;
359         int rv, n_writes, n_reads;
360
361         b = tconn->oldest_tle;
362         pn = &tconn->oldest_tle;
363         while (b) {
364                 n_writes = 0;
365                 n_reads = 0;
366                 INIT_LIST_HEAD(&carry_reads);
367                 list_for_each_safe(le, tle, &b->requests) {
368                         req = list_entry(le, struct drbd_request, tl_requests);
369                         rv = _req_mod(req, what);
370
371                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
373                 }
374                 tmp = b->next;
375
376                 if (n_writes) {
377                         if (what == RESEND) {
378                                 b->n_writes = n_writes;
379                                 if (b->w.cb == NULL) {
380                                         b->w.cb = w_send_barrier;
381                                         inc_ap_pending(b->w.mdev);
382                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
383                                 }
384
385                                 drbd_queue_work(&tconn->data.work, &b->w);
386                         }
387                         pn = &b->next;
388                 } else {
389                         if (n_reads)
390                                 list_add(&carry_reads, &b->requests);
391                         /* there could still be requests on that ring list,
392                          * in case local io is still pending */
393                         list_del(&b->requests);
394
395                         /* dec_ap_pending corresponding to queue_barrier.
396                          * the newest barrier may not have been queued yet,
397                          * in which case w.cb is still NULL. */
398                         if (b->w.cb != NULL)
399                                 dec_ap_pending(b->w.mdev);
400
401                         if (b == tconn->newest_tle) {
402                                 /* recycle, but reinit! */
403                                 if (tmp != NULL)
404                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
405                                 INIT_LIST_HEAD(&b->requests);
406                                 list_splice(&carry_reads, &b->requests);
407                                 INIT_LIST_HEAD(&b->w.list);
408                                 b->w.cb = NULL;
409                                 b->br_number = net_random();
410                                 b->n_writes = 0;
411
412                                 *pn = b;
413                                 break;
414                         }
415                         *pn = tmp;
416                         kfree(b);
417                 }
418                 b = tmp;
419                 list_splice(&carry_reads, &b->requests);
420         }
421
422         /* Actions operating on the disk state, also want to work on
423            requests that got barrier acked. */
424         switch (what) {
425         case FAIL_FROZEN_DISK_IO:
426         case RESTART_FROZEN_DISK_IO:
427                 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
428                         req = list_entry(le, struct drbd_request, tl_requests);
429                         _req_mod(req, what);
430                 }
431         case CONNECTION_LOST_WHILE_PENDING:
432         case RESEND:
433                 break;
434         default:
435                 conn_err(tconn, "what = %d in _tl_restart()\n", what);
436         }
437 }
438
439 /**
440  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
441  * @mdev:       DRBD device.
442  *
443  * This is called after the connection to the peer was lost. The storage covered
444  * by the requests on the transfer gets marked as our of sync. Called from the
445  * receiver thread and the worker thread.
446  */
447 void tl_clear(struct drbd_tconn *tconn)
448 {
449         struct drbd_conf *mdev;
450         struct list_head *le, *tle;
451         struct drbd_request *r;
452         int vnr;
453
454         spin_lock_irq(&tconn->req_lock);
455
456         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
457
458         /* we expect this list to be empty. */
459         if (!list_empty(&tconn->out_of_sequence_requests))
460                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
461
462         /* but just in case, clean it up anyways! */
463         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
464                 r = list_entry(le, struct drbd_request, tl_requests);
465                 /* It would be nice to complete outside of spinlock.
466                  * But this is easier for now. */
467                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
468         }
469
470         /* ensure bit indicating barrier is required is clear */
471         rcu_read_lock();
472         idr_for_each_entry(&tconn->volumes, mdev, vnr)
473                 clear_bit(CREATE_BARRIER, &mdev->flags);
474         rcu_read_unlock();
475
476         spin_unlock_irq(&tconn->req_lock);
477 }
478
479 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
480 {
481         spin_lock_irq(&tconn->req_lock);
482         _tl_restart(tconn, what);
483         spin_unlock_irq(&tconn->req_lock);
484 }
485
486 /**
487  * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
488  * @mdev:       DRBD device.
489  */
490 void tl_abort_disk_io(struct drbd_conf *mdev)
491 {
492         struct drbd_tconn *tconn = mdev->tconn;
493         struct drbd_tl_epoch *b;
494         struct list_head *le, *tle;
495         struct drbd_request *req;
496
497         spin_lock_irq(&tconn->req_lock);
498         b = tconn->oldest_tle;
499         while (b) {
500                 list_for_each_safe(le, tle, &b->requests) {
501                         req = list_entry(le, struct drbd_request, tl_requests);
502                         if (req->w.mdev == mdev)
503                                 _req_mod(req, ABORT_DISK_IO);
504                 }
505                 b = b->next;
506         }
507
508         list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
509                 req = list_entry(le, struct drbd_request, tl_requests);
510                 if (req->w.mdev == mdev)
511                         _req_mod(req, ABORT_DISK_IO);
512         }
513
514         spin_unlock_irq(&tconn->req_lock);
515 }
516
517 static int drbd_thread_setup(void *arg)
518 {
519         struct drbd_thread *thi = (struct drbd_thread *) arg;
520         struct drbd_tconn *tconn = thi->tconn;
521         unsigned long flags;
522         int retval;
523
524         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
525                  thi->name[0], thi->tconn->name);
526
527 restart:
528         retval = thi->function(thi);
529
530         spin_lock_irqsave(&thi->t_lock, flags);
531
532         /* if the receiver has been "EXITING", the last thing it did
533          * was set the conn state to "StandAlone",
534          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
535          * and receiver thread will be "started".
536          * drbd_thread_start needs to set "RESTARTING" in that case.
537          * t_state check and assignment needs to be within the same spinlock,
538          * so either thread_start sees EXITING, and can remap to RESTARTING,
539          * or thread_start see NONE, and can proceed as normal.
540          */
541
542         if (thi->t_state == RESTARTING) {
543                 conn_info(tconn, "Restarting %s thread\n", thi->name);
544                 thi->t_state = RUNNING;
545                 spin_unlock_irqrestore(&thi->t_lock, flags);
546                 goto restart;
547         }
548
549         thi->task = NULL;
550         thi->t_state = NONE;
551         smp_mb();
552         complete_all(&thi->stop);
553         spin_unlock_irqrestore(&thi->t_lock, flags);
554
555         conn_info(tconn, "Terminating %s\n", current->comm);
556
557         /* Release mod reference taken when thread was started */
558
559         kref_put(&tconn->kref, &conn_destroy);
560         module_put(THIS_MODULE);
561         return retval;
562 }
563
564 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
565                              int (*func) (struct drbd_thread *), char *name)
566 {
567         spin_lock_init(&thi->t_lock);
568         thi->task    = NULL;
569         thi->t_state = NONE;
570         thi->function = func;
571         thi->tconn = tconn;
572         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
573 }
574
575 int drbd_thread_start(struct drbd_thread *thi)
576 {
577         struct drbd_tconn *tconn = thi->tconn;
578         struct task_struct *nt;
579         unsigned long flags;
580
581         /* is used from state engine doing drbd_thread_stop_nowait,
582          * while holding the req lock irqsave */
583         spin_lock_irqsave(&thi->t_lock, flags);
584
585         switch (thi->t_state) {
586         case NONE:
587                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
588                          thi->name, current->comm, current->pid);
589
590                 /* Get ref on module for thread - this is released when thread exits */
591                 if (!try_module_get(THIS_MODULE)) {
592                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
593                         spin_unlock_irqrestore(&thi->t_lock, flags);
594                         return false;
595                 }
596
597                 kref_get(&thi->tconn->kref);
598
599                 init_completion(&thi->stop);
600                 thi->reset_cpu_mask = 1;
601                 thi->t_state = RUNNING;
602                 spin_unlock_irqrestore(&thi->t_lock, flags);
603                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
604
605                 nt = kthread_create(drbd_thread_setup, (void *) thi,
606                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
607
608                 if (IS_ERR(nt)) {
609                         conn_err(tconn, "Couldn't start thread\n");
610
611                         kref_put(&tconn->kref, &conn_destroy);
612                         module_put(THIS_MODULE);
613                         return false;
614                 }
615                 spin_lock_irqsave(&thi->t_lock, flags);
616                 thi->task = nt;
617                 thi->t_state = RUNNING;
618                 spin_unlock_irqrestore(&thi->t_lock, flags);
619                 wake_up_process(nt);
620                 break;
621         case EXITING:
622                 thi->t_state = RESTARTING;
623                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
624                                 thi->name, current->comm, current->pid);
625                 /* fall through */
626         case RUNNING:
627         case RESTARTING:
628         default:
629                 spin_unlock_irqrestore(&thi->t_lock, flags);
630                 break;
631         }
632
633         return true;
634 }
635
636
637 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
638 {
639         unsigned long flags;
640
641         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
642
643         /* may be called from state engine, holding the req lock irqsave */
644         spin_lock_irqsave(&thi->t_lock, flags);
645
646         if (thi->t_state == NONE) {
647                 spin_unlock_irqrestore(&thi->t_lock, flags);
648                 if (restart)
649                         drbd_thread_start(thi);
650                 return;
651         }
652
653         if (thi->t_state != ns) {
654                 if (thi->task == NULL) {
655                         spin_unlock_irqrestore(&thi->t_lock, flags);
656                         return;
657                 }
658
659                 thi->t_state = ns;
660                 smp_mb();
661                 init_completion(&thi->stop);
662                 if (thi->task != current)
663                         force_sig(DRBD_SIGKILL, thi->task);
664         }
665
666         spin_unlock_irqrestore(&thi->t_lock, flags);
667
668         if (wait)
669                 wait_for_completion(&thi->stop);
670 }
671
672 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
673 {
674         struct drbd_thread *thi =
675                 task == tconn->receiver.task ? &tconn->receiver :
676                 task == tconn->asender.task  ? &tconn->asender :
677                 task == tconn->worker.task   ? &tconn->worker : NULL;
678
679         return thi;
680 }
681
682 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
683 {
684         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
685         return thi ? thi->name : task->comm;
686 }
687
688 int conn_lowest_minor(struct drbd_tconn *tconn)
689 {
690         struct drbd_conf *mdev;
691         int vnr = 0, m;
692
693         rcu_read_lock();
694         mdev = idr_get_next(&tconn->volumes, &vnr);
695         m = mdev ? mdev_to_minor(mdev) : -1;
696         rcu_read_unlock();
697
698         return m;
699 }
700
701 #ifdef CONFIG_SMP
702 /**
703  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
704  * @mdev:       DRBD device.
705  *
706  * Forces all threads of a device onto the same CPU. This is beneficial for
707  * DRBD's performance. May be overwritten by user's configuration.
708  */
709 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
710 {
711         int ord, cpu;
712
713         /* user override. */
714         if (cpumask_weight(tconn->cpu_mask))
715                 return;
716
717         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
718         for_each_online_cpu(cpu) {
719                 if (ord-- == 0) {
720                         cpumask_set_cpu(cpu, tconn->cpu_mask);
721                         return;
722                 }
723         }
724         /* should not be reached */
725         cpumask_setall(tconn->cpu_mask);
726 }
727
728 /**
729  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
730  * @mdev:       DRBD device.
731  * @thi:        drbd_thread object
732  *
733  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
734  * prematurely.
735  */
736 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
737 {
738         struct task_struct *p = current;
739
740         if (!thi->reset_cpu_mask)
741                 return;
742         thi->reset_cpu_mask = 0;
743         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
744 }
745 #endif
746
747 /**
748  * drbd_header_size  -  size of a packet header
749  *
750  * The header size is a multiple of 8, so any payload following the header is
751  * word aligned on 64-bit architectures.  (The bitmap send and receive code
752  * relies on this.)
753  */
754 unsigned int drbd_header_size(struct drbd_tconn *tconn)
755 {
756         if (tconn->agreed_pro_version >= 100) {
757                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
758                 return sizeof(struct p_header100);
759         } else {
760                 BUILD_BUG_ON(sizeof(struct p_header80) !=
761                              sizeof(struct p_header95));
762                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
763                 return sizeof(struct p_header80);
764         }
765 }
766
767 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
768 {
769         h->magic   = cpu_to_be32(DRBD_MAGIC);
770         h->command = cpu_to_be16(cmd);
771         h->length  = cpu_to_be16(size);
772         return sizeof(struct p_header80);
773 }
774
775 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
776 {
777         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
778         h->command = cpu_to_be16(cmd);
779         h->length = cpu_to_be32(size);
780         return sizeof(struct p_header95);
781 }
782
783 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
784                                       int size, int vnr)
785 {
786         h->magic = cpu_to_be32(DRBD_MAGIC_100);
787         h->volume = cpu_to_be16(vnr);
788         h->command = cpu_to_be16(cmd);
789         h->length = cpu_to_be32(size);
790         h->pad = 0;
791         return sizeof(struct p_header100);
792 }
793
794 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
795                                    void *buffer, enum drbd_packet cmd, int size)
796 {
797         if (tconn->agreed_pro_version >= 100)
798                 return prepare_header100(buffer, cmd, size, vnr);
799         else if (tconn->agreed_pro_version >= 95 &&
800                  size > DRBD_MAX_SIZE_H80_PACKET)
801                 return prepare_header95(buffer, cmd, size);
802         else
803                 return prepare_header80(buffer, cmd, size);
804 }
805
806 static void *__conn_prepare_command(struct drbd_tconn *tconn,
807                                     struct drbd_socket *sock)
808 {
809         if (!sock->socket)
810                 return NULL;
811         return sock->sbuf + drbd_header_size(tconn);
812 }
813
814 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
815 {
816         void *p;
817
818         mutex_lock(&sock->mutex);
819         p = __conn_prepare_command(tconn, sock);
820         if (!p)
821                 mutex_unlock(&sock->mutex);
822
823         return p;
824 }
825
826 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
827 {
828         return conn_prepare_command(mdev->tconn, sock);
829 }
830
831 static int __send_command(struct drbd_tconn *tconn, int vnr,
832                           struct drbd_socket *sock, enum drbd_packet cmd,
833                           unsigned int header_size, void *data,
834                           unsigned int size)
835 {
836         int msg_flags;
837         int err;
838
839         /*
840          * Called with @data == NULL and the size of the data blocks in @size
841          * for commands that send data blocks.  For those commands, omit the
842          * MSG_MORE flag: this will increase the likelihood that data blocks
843          * which are page aligned on the sender will end up page aligned on the
844          * receiver.
845          */
846         msg_flags = data ? MSG_MORE : 0;
847
848         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
849                                       header_size + size);
850         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
851                             msg_flags);
852         if (data && !err)
853                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
854         return err;
855 }
856
857 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
858                                enum drbd_packet cmd, unsigned int header_size,
859                                void *data, unsigned int size)
860 {
861         return __send_command(tconn, 0, sock, cmd, header_size, data, size);
862 }
863
864 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
865                       enum drbd_packet cmd, unsigned int header_size,
866                       void *data, unsigned int size)
867 {
868         int err;
869
870         err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
871         mutex_unlock(&sock->mutex);
872         return err;
873 }
874
875 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
876                       enum drbd_packet cmd, unsigned int header_size,
877                       void *data, unsigned int size)
878 {
879         int err;
880
881         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
882                              data, size);
883         mutex_unlock(&sock->mutex);
884         return err;
885 }
886
887 int drbd_send_ping(struct drbd_tconn *tconn)
888 {
889         struct drbd_socket *sock;
890
891         sock = &tconn->meta;
892         if (!conn_prepare_command(tconn, sock))
893                 return -EIO;
894         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
895 }
896
897 int drbd_send_ping_ack(struct drbd_tconn *tconn)
898 {
899         struct drbd_socket *sock;
900
901         sock = &tconn->meta;
902         if (!conn_prepare_command(tconn, sock))
903                 return -EIO;
904         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
905 }
906
907 int drbd_send_sync_param(struct drbd_conf *mdev)
908 {
909         struct drbd_socket *sock;
910         struct p_rs_param_95 *p;
911         int size;
912         const int apv = mdev->tconn->agreed_pro_version;
913         enum drbd_packet cmd;
914         struct net_conf *nc;
915         struct disk_conf *dc;
916
917         sock = &mdev->tconn->data;
918         p = drbd_prepare_command(mdev, sock);
919         if (!p)
920                 return -EIO;
921
922         rcu_read_lock();
923         nc = rcu_dereference(mdev->tconn->net_conf);
924
925         size = apv <= 87 ? sizeof(struct p_rs_param)
926                 : apv == 88 ? sizeof(struct p_rs_param)
927                         + strlen(nc->verify_alg) + 1
928                 : apv <= 94 ? sizeof(struct p_rs_param_89)
929                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
930
931         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
932
933         /* initialize verify_alg and csums_alg */
934         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
935
936         if (get_ldev(mdev)) {
937                 dc = rcu_dereference(mdev->ldev->disk_conf);
938                 p->resync_rate = cpu_to_be32(dc->resync_rate);
939                 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
940                 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
941                 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
942                 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
943                 put_ldev(mdev);
944         } else {
945                 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
946                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
947                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
948                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
949                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
950         }
951
952         if (apv >= 88)
953                 strcpy(p->verify_alg, nc->verify_alg);
954         if (apv >= 89)
955                 strcpy(p->csums_alg, nc->csums_alg);
956         rcu_read_unlock();
957
958         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
959 }
960
961 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
962 {
963         struct drbd_socket *sock;
964         struct p_protocol *p;
965         struct net_conf *nc;
966         int size, cf;
967
968         sock = &tconn->data;
969         p = __conn_prepare_command(tconn, sock);
970         if (!p)
971                 return -EIO;
972
973         rcu_read_lock();
974         nc = rcu_dereference(tconn->net_conf);
975
976         if (nc->tentative && tconn->agreed_pro_version < 92) {
977                 rcu_read_unlock();
978                 mutex_unlock(&sock->mutex);
979                 conn_err(tconn, "--dry-run is not supported by peer");
980                 return -EOPNOTSUPP;
981         }
982
983         size = sizeof(*p);
984         if (tconn->agreed_pro_version >= 87)
985                 size += strlen(nc->integrity_alg) + 1;
986
987         p->protocol      = cpu_to_be32(nc->wire_protocol);
988         p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
989         p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
990         p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
991         p->two_primaries = cpu_to_be32(nc->two_primaries);
992         cf = 0;
993         if (nc->discard_my_data)
994                 cf |= CF_DISCARD_MY_DATA;
995         if (nc->tentative)
996                 cf |= CF_DRY_RUN;
997         p->conn_flags    = cpu_to_be32(cf);
998
999         if (tconn->agreed_pro_version >= 87)
1000                 strcpy(p->integrity_alg, nc->integrity_alg);
1001         rcu_read_unlock();
1002
1003         return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
1004 }
1005
1006 int drbd_send_protocol(struct drbd_tconn *tconn)
1007 {
1008         int err;
1009
1010         mutex_lock(&tconn->data.mutex);
1011         err = __drbd_send_protocol(tconn, P_PROTOCOL);
1012         mutex_unlock(&tconn->data.mutex);
1013
1014         return err;
1015 }
1016
1017 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1018 {
1019         struct drbd_socket *sock;
1020         struct p_uuids *p;
1021         int i;
1022
1023         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1024                 return 0;
1025
1026         sock = &mdev->tconn->data;
1027         p = drbd_prepare_command(mdev, sock);
1028         if (!p) {
1029                 put_ldev(mdev);
1030                 return -EIO;
1031         }
1032         for (i = UI_CURRENT; i < UI_SIZE; i++)
1033                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1034
1035         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1036         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1037         rcu_read_lock();
1038         uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
1039         rcu_read_unlock();
1040         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1041         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1042         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1043
1044         put_ldev(mdev);
1045         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
1046 }
1047
1048 int drbd_send_uuids(struct drbd_conf *mdev)
1049 {
1050         return _drbd_send_uuids(mdev, 0);
1051 }
1052
1053 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1054 {
1055         return _drbd_send_uuids(mdev, 8);
1056 }
1057
1058 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1059 {
1060         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1061                 u64 *uuid = mdev->ldev->md.uuid;
1062                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1063                      text,
1064                      (unsigned long long)uuid[UI_CURRENT],
1065                      (unsigned long long)uuid[UI_BITMAP],
1066                      (unsigned long long)uuid[UI_HISTORY_START],
1067                      (unsigned long long)uuid[UI_HISTORY_END]);
1068                 put_ldev(mdev);
1069         } else {
1070                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1071                                 text,
1072                                 (unsigned long long)mdev->ed_uuid);
1073         }
1074 }
1075
1076 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1077 {
1078         struct drbd_socket *sock;
1079         struct p_rs_uuid *p;
1080         u64 uuid;
1081
1082         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1083
1084         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1085         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1086         drbd_print_uuids(mdev, "updated sync UUID");
1087         drbd_md_sync(mdev);
1088
1089         sock = &mdev->tconn->data;
1090         p = drbd_prepare_command(mdev, sock);
1091         if (p) {
1092                 p->uuid = cpu_to_be64(uuid);
1093                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1094         }
1095 }
1096
1097 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1098 {
1099         struct drbd_socket *sock;
1100         struct p_sizes *p;
1101         sector_t d_size, u_size;
1102         int q_order_type, max_bio_size;
1103
1104         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1105                 D_ASSERT(mdev->ldev->backing_bdev);
1106                 d_size = drbd_get_max_capacity(mdev->ldev);
1107                 rcu_read_lock();
1108                 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1109                 rcu_read_unlock();
1110                 q_order_type = drbd_queue_order_type(mdev);
1111                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1112                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1113                 put_ldev(mdev);
1114         } else {
1115                 d_size = 0;
1116                 u_size = 0;
1117                 q_order_type = QUEUE_ORDERED_NONE;
1118                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1119         }
1120
1121         sock = &mdev->tconn->data;
1122         p = drbd_prepare_command(mdev, sock);
1123         if (!p)
1124                 return -EIO;
1125
1126         if (mdev->tconn->agreed_pro_version <= 94)
1127                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
1128         else if (mdev->tconn->agreed_pro_version < 100)
1129                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
1130
1131         p->d_size = cpu_to_be64(d_size);
1132         p->u_size = cpu_to_be64(u_size);
1133         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1134         p->max_bio_size = cpu_to_be32(max_bio_size);
1135         p->queue_order_type = cpu_to_be16(q_order_type);
1136         p->dds_flags = cpu_to_be16(flags);
1137         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1138 }
1139
1140 /**
1141  * drbd_send_state() - Sends the drbd state to the peer
1142  * @mdev:       DRBD device.
1143  */
1144 int drbd_send_state(struct drbd_conf *mdev)
1145 {
1146         struct drbd_socket *sock;
1147         struct p_state *p;
1148
1149         sock = &mdev->tconn->data;
1150         p = drbd_prepare_command(mdev, sock);
1151         if (!p)
1152                 return -EIO;
1153         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1154         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1155 }
1156
1157 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1158 {
1159         struct drbd_socket *sock;
1160         struct p_req_state *p;
1161
1162         sock = &mdev->tconn->data;
1163         p = drbd_prepare_command(mdev, sock);
1164         if (!p)
1165                 return -EIO;
1166         p->mask = cpu_to_be32(mask.i);
1167         p->val = cpu_to_be32(val.i);
1168         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1169
1170 }
1171
1172 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1173 {
1174         enum drbd_packet cmd;
1175         struct drbd_socket *sock;
1176         struct p_req_state *p;
1177
1178         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1179         sock = &tconn->data;
1180         p = conn_prepare_command(tconn, sock);
1181         if (!p)
1182                 return -EIO;
1183         p->mask = cpu_to_be32(mask.i);
1184         p->val = cpu_to_be32(val.i);
1185         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1186 }
1187
1188 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1189 {
1190         struct drbd_socket *sock;
1191         struct p_req_state_reply *p;
1192
1193         sock = &mdev->tconn->meta;
1194         p = drbd_prepare_command(mdev, sock);
1195         if (p) {
1196                 p->retcode = cpu_to_be32(retcode);
1197                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1198         }
1199 }
1200
1201 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1202 {
1203         struct drbd_socket *sock;
1204         struct p_req_state_reply *p;
1205         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1206
1207         sock = &tconn->meta;
1208         p = conn_prepare_command(tconn, sock);
1209         if (p) {
1210                 p->retcode = cpu_to_be32(retcode);
1211                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1212         }
1213 }
1214
1215 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1216 {
1217         BUG_ON(code & ~0xf);
1218         p->encoding = (p->encoding & ~0xf) | code;
1219 }
1220
1221 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1222 {
1223         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1224 }
1225
1226 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1227 {
1228         BUG_ON(n & ~0x7);
1229         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1230 }
1231
1232 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1233                          struct p_compressed_bm *p,
1234                          unsigned int size,
1235                          struct bm_xfer_ctx *c)
1236 {
1237         struct bitstream bs;
1238         unsigned long plain_bits;
1239         unsigned long tmp;
1240         unsigned long rl;
1241         unsigned len;
1242         unsigned toggle;
1243         int bits, use_rle;
1244
1245         /* may we use this feature? */
1246         rcu_read_lock();
1247         use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1248         rcu_read_unlock();
1249         if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1250                 return 0;
1251
1252         if (c->bit_offset >= c->bm_bits)
1253                 return 0; /* nothing to do. */
1254
1255         /* use at most thus many bytes */
1256         bitstream_init(&bs, p->code, size, 0);
1257         memset(p->code, 0, size);
1258         /* plain bits covered in this code string */
1259         plain_bits = 0;
1260
1261         /* p->encoding & 0x80 stores whether the first run length is set.
1262          * bit offset is implicit.
1263          * start with toggle == 2 to be able to tell the first iteration */
1264         toggle = 2;
1265
1266         /* see how much plain bits we can stuff into one packet
1267          * using RLE and VLI. */
1268         do {
1269                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1270                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1271                 if (tmp == -1UL)
1272                         tmp = c->bm_bits;
1273                 rl = tmp - c->bit_offset;
1274
1275                 if (toggle == 2) { /* first iteration */
1276                         if (rl == 0) {
1277                                 /* the first checked bit was set,
1278                                  * store start value, */
1279                                 dcbp_set_start(p, 1);
1280                                 /* but skip encoding of zero run length */
1281                                 toggle = !toggle;
1282                                 continue;
1283                         }
1284                         dcbp_set_start(p, 0);
1285                 }
1286
1287                 /* paranoia: catch zero runlength.
1288                  * can only happen if bitmap is modified while we scan it. */
1289                 if (rl == 0) {
1290                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1291                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1292                         return -1;
1293                 }
1294
1295                 bits = vli_encode_bits(&bs, rl);
1296                 if (bits == -ENOBUFS) /* buffer full */
1297                         break;
1298                 if (bits <= 0) {
1299                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1300                         return 0;
1301                 }
1302
1303                 toggle = !toggle;
1304                 plain_bits += rl;
1305                 c->bit_offset = tmp;
1306         } while (c->bit_offset < c->bm_bits);
1307
1308         len = bs.cur.b - p->code + !!bs.cur.bit;
1309
1310         if (plain_bits < (len << 3)) {
1311                 /* incompressible with this method.
1312                  * we need to rewind both word and bit position. */
1313                 c->bit_offset -= plain_bits;
1314                 bm_xfer_ctx_bit_to_word_offset(c);
1315                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1316                 return 0;
1317         }
1318
1319         /* RLE + VLI was able to compress it just fine.
1320          * update c->word_offset. */
1321         bm_xfer_ctx_bit_to_word_offset(c);
1322
1323         /* store pad_bits */
1324         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1325
1326         return len;
1327 }
1328
1329 /**
1330  * send_bitmap_rle_or_plain
1331  *
1332  * Return 0 when done, 1 when another iteration is needed, and a negative error
1333  * code upon failure.
1334  */
1335 static int
1336 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1337 {
1338         struct drbd_socket *sock = &mdev->tconn->data;
1339         unsigned int header_size = drbd_header_size(mdev->tconn);
1340         struct p_compressed_bm *p = sock->sbuf + header_size;
1341         int len, err;
1342
1343         len = fill_bitmap_rle_bits(mdev, p,
1344                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1345         if (len < 0)
1346                 return -EIO;
1347
1348         if (len) {
1349                 dcbp_set_code(p, RLE_VLI_Bits);
1350                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1351                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1352                                      NULL, 0);
1353                 c->packets[0]++;
1354                 c->bytes[0] += header_size + sizeof(*p) + len;
1355
1356                 if (c->bit_offset >= c->bm_bits)
1357                         len = 0; /* DONE */
1358         } else {
1359                 /* was not compressible.
1360                  * send a buffer full of plain text bits instead. */
1361                 unsigned int data_size;
1362                 unsigned long num_words;
1363                 unsigned long *p = sock->sbuf + header_size;
1364
1365                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1366                 num_words = min_t(size_t, data_size / sizeof(*p),
1367                                   c->bm_words - c->word_offset);
1368                 len = num_words * sizeof(*p);
1369                 if (len)
1370                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1371                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1372                 c->word_offset += num_words;
1373                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1374
1375                 c->packets[1]++;
1376                 c->bytes[1] += header_size + len;
1377
1378                 if (c->bit_offset > c->bm_bits)
1379                         c->bit_offset = c->bm_bits;
1380         }
1381         if (!err) {
1382                 if (len == 0) {
1383                         INFO_bm_xfer_stats(mdev, "send", c);
1384                         return 0;
1385                 } else
1386                         return 1;
1387         }
1388         return -EIO;
1389 }
1390
1391 /* See the comment at receive_bitmap() */
1392 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1393 {
1394         struct bm_xfer_ctx c;
1395         int err;
1396
1397         if (!expect(mdev->bitmap))
1398                 return false;
1399
1400         if (get_ldev(mdev)) {
1401                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1402                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1403                         drbd_bm_set_all(mdev);
1404                         if (drbd_bm_write(mdev)) {
1405                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1406                                  * but otherwise process as per normal - need to tell other
1407                                  * side that a full resync is required! */
1408                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1409                         } else {
1410                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1411                                 drbd_md_sync(mdev);
1412                         }
1413                 }
1414                 put_ldev(mdev);
1415         }
1416
1417         c = (struct bm_xfer_ctx) {
1418                 .bm_bits = drbd_bm_bits(mdev),
1419                 .bm_words = drbd_bm_words(mdev),
1420         };
1421
1422         do {
1423                 err = send_bitmap_rle_or_plain(mdev, &c);
1424         } while (err > 0);
1425
1426         return err == 0;
1427 }
1428
1429 int drbd_send_bitmap(struct drbd_conf *mdev)
1430 {
1431         struct drbd_socket *sock = &mdev->tconn->data;
1432         int err = -1;
1433
1434         mutex_lock(&sock->mutex);
1435         if (sock->socket)
1436                 err = !_drbd_send_bitmap(mdev);
1437         mutex_unlock(&sock->mutex);
1438         return err;
1439 }
1440
1441 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1442 {
1443         struct drbd_socket *sock;
1444         struct p_barrier_ack *p;
1445
1446         if (mdev->state.conn < C_CONNECTED)
1447                 return;
1448
1449         sock = &mdev->tconn->meta;
1450         p = drbd_prepare_command(mdev, sock);
1451         if (!p)
1452                 return;
1453         p->barrier = barrier_nr;
1454         p->set_size = cpu_to_be32(set_size);
1455         drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1456 }
1457
1458 /**
1459  * _drbd_send_ack() - Sends an ack packet
1460  * @mdev:       DRBD device.
1461  * @cmd:        Packet command code.
1462  * @sector:     sector, needs to be in big endian byte order
1463  * @blksize:    size in byte, needs to be in big endian byte order
1464  * @block_id:   Id, big endian byte order
1465  */
1466 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1467                           u64 sector, u32 blksize, u64 block_id)
1468 {
1469         struct drbd_socket *sock;
1470         struct p_block_ack *p;
1471
1472         if (mdev->state.conn < C_CONNECTED)
1473                 return -EIO;
1474
1475         sock = &mdev->tconn->meta;
1476         p = drbd_prepare_command(mdev, sock);
1477         if (!p)
1478                 return -EIO;
1479         p->sector = sector;
1480         p->block_id = block_id;
1481         p->blksize = blksize;
1482         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1483         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1484 }
1485
1486 /* dp->sector and dp->block_id already/still in network byte order,
1487  * data_size is payload size according to dp->head,
1488  * and may need to be corrected for digest size. */
1489 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1490                       struct p_data *dp, int data_size)
1491 {
1492         if (mdev->tconn->peer_integrity_tfm)
1493                 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1494         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1495                        dp->block_id);
1496 }
1497
1498 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1499                       struct p_block_req *rp)
1500 {
1501         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1502 }
1503
1504 /**
1505  * drbd_send_ack() - Sends an ack packet
1506  * @mdev:       DRBD device
1507  * @cmd:        packet command code
1508  * @peer_req:   peer request
1509  */
1510 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1511                   struct drbd_peer_request *peer_req)
1512 {
1513         return _drbd_send_ack(mdev, cmd,
1514                               cpu_to_be64(peer_req->i.sector),
1515                               cpu_to_be32(peer_req->i.size),
1516                               peer_req->block_id);
1517 }
1518
1519 /* This function misuses the block_id field to signal if the blocks
1520  * are is sync or not. */
1521 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1522                      sector_t sector, int blksize, u64 block_id)
1523 {
1524         return _drbd_send_ack(mdev, cmd,
1525                               cpu_to_be64(sector),
1526                               cpu_to_be32(blksize),
1527                               cpu_to_be64(block_id));
1528 }
1529
1530 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1531                        sector_t sector, int size, u64 block_id)
1532 {
1533         struct drbd_socket *sock;
1534         struct p_block_req *p;
1535
1536         sock = &mdev->tconn->data;
1537         p = drbd_prepare_command(mdev, sock);
1538         if (!p)
1539                 return -EIO;
1540         p->sector = cpu_to_be64(sector);
1541         p->block_id = block_id;
1542         p->blksize = cpu_to_be32(size);
1543         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1544 }
1545
1546 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1547                             void *digest, int digest_size, enum drbd_packet cmd)
1548 {
1549         struct drbd_socket *sock;
1550         struct p_block_req *p;
1551
1552         /* FIXME: Put the digest into the preallocated socket buffer.  */
1553
1554         sock = &mdev->tconn->data;
1555         p = drbd_prepare_command(mdev, sock);
1556         if (!p)
1557                 return -EIO;
1558         p->sector = cpu_to_be64(sector);
1559         p->block_id = ID_SYNCER /* unused */;
1560         p->blksize = cpu_to_be32(size);
1561         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1562                                  digest, digest_size);
1563 }
1564
1565 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1566 {
1567         struct drbd_socket *sock;
1568         struct p_block_req *p;
1569
1570         sock = &mdev->tconn->data;
1571         p = drbd_prepare_command(mdev, sock);
1572         if (!p)
1573                 return -EIO;
1574         p->sector = cpu_to_be64(sector);
1575         p->block_id = ID_SYNCER /* unused */;
1576         p->blksize = cpu_to_be32(size);
1577         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1578 }
1579
1580 /* called on sndtimeo
1581  * returns false if we should retry,
1582  * true if we think connection is dead
1583  */
1584 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1585 {
1586         int drop_it;
1587         /* long elapsed = (long)(jiffies - mdev->last_received); */
1588
1589         drop_it =   tconn->meta.socket == sock
1590                 || !tconn->asender.task
1591                 || get_t_state(&tconn->asender) != RUNNING
1592                 || tconn->cstate < C_WF_REPORT_PARAMS;
1593
1594         if (drop_it)
1595                 return true;
1596
1597         drop_it = !--tconn->ko_count;
1598         if (!drop_it) {
1599                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1600                          current->comm, current->pid, tconn->ko_count);
1601                 request_ping(tconn);
1602         }
1603
1604         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1605 }
1606
1607 static void drbd_update_congested(struct drbd_tconn *tconn)
1608 {
1609         struct sock *sk = tconn->data.socket->sk;
1610         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1611                 set_bit(NET_CONGESTED, &tconn->flags);
1612 }
1613
1614 /* The idea of sendpage seems to be to put some kind of reference
1615  * to the page into the skb, and to hand it over to the NIC. In
1616  * this process get_page() gets called.
1617  *
1618  * As soon as the page was really sent over the network put_page()
1619  * gets called by some part of the network layer. [ NIC driver? ]
1620  *
1621  * [ get_page() / put_page() increment/decrement the count. If count
1622  *   reaches 0 the page will be freed. ]
1623  *
1624  * This works nicely with pages from FSs.
1625  * But this means that in protocol A we might signal IO completion too early!
1626  *
1627  * In order not to corrupt data during a resync we must make sure
1628  * that we do not reuse our own buffer pages (EEs) to early, therefore
1629  * we have the net_ee list.
1630  *
1631  * XFS seems to have problems, still, it submits pages with page_count == 0!
1632  * As a workaround, we disable sendpage on pages
1633  * with page_count == 0 or PageSlab.
1634  */
1635 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1636                               int offset, size_t size, unsigned msg_flags)
1637 {
1638         struct socket *socket;
1639         void *addr;
1640         int err;
1641
1642         socket = mdev->tconn->data.socket;
1643         addr = kmap(page) + offset;
1644         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1645         kunmap(page);
1646         if (!err)
1647                 mdev->send_cnt += size >> 9;
1648         return err;
1649 }
1650
1651 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1652                     int offset, size_t size, unsigned msg_flags)
1653 {
1654         struct socket *socket = mdev->tconn->data.socket;
1655         mm_segment_t oldfs = get_fs();
1656         int len = size;
1657         int err = -EIO;
1658
1659         /* e.g. XFS meta- & log-data is in slab pages, which have a
1660          * page_count of 0 and/or have PageSlab() set.
1661          * we cannot use send_page for those, as that does get_page();
1662          * put_page(); and would cause either a VM_BUG directly, or
1663          * __page_cache_release a page that would actually still be referenced
1664          * by someone, leading to some obscure delayed Oops somewhere else. */
1665         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1666                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1667
1668         msg_flags |= MSG_NOSIGNAL;
1669         drbd_update_congested(mdev->tconn);
1670         set_fs(KERNEL_DS);
1671         do {
1672                 int sent;
1673
1674                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1675                 if (sent <= 0) {
1676                         if (sent == -EAGAIN) {
1677                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1678                                         break;
1679                                 continue;
1680                         }
1681                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1682                              __func__, (int)size, len, sent);
1683                         if (sent < 0)
1684                                 err = sent;
1685                         break;
1686                 }
1687                 len    -= sent;
1688                 offset += sent;
1689         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1690         set_fs(oldfs);
1691         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1692
1693         if (len == 0) {
1694                 err = 0;
1695                 mdev->send_cnt += size >> 9;
1696         }
1697         return err;
1698 }
1699
1700 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1701 {
1702         struct bio_vec *bvec;
1703         int i;
1704         /* hint all but last page with MSG_MORE */
1705         __bio_for_each_segment(bvec, bio, i, 0) {
1706                 int err;
1707
1708                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1709                                          bvec->bv_offset, bvec->bv_len,
1710                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1711                 if (err)
1712                         return err;
1713         }
1714         return 0;
1715 }
1716
1717 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1718 {
1719         struct bio_vec *bvec;
1720         int i;
1721         /* hint all but last page with MSG_MORE */
1722         __bio_for_each_segment(bvec, bio, i, 0) {
1723                 int err;
1724
1725                 err = _drbd_send_page(mdev, bvec->bv_page,
1726                                       bvec->bv_offset, bvec->bv_len,
1727                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1728                 if (err)
1729                         return err;
1730         }
1731         return 0;
1732 }
1733
1734 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1735                             struct drbd_peer_request *peer_req)
1736 {
1737         struct page *page = peer_req->pages;
1738         unsigned len = peer_req->i.size;
1739         int err;
1740
1741         /* hint all but last page with MSG_MORE */
1742         page_chain_for_each(page) {
1743                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1744
1745                 err = _drbd_send_page(mdev, page, 0, l,
1746                                       page_chain_next(page) ? MSG_MORE : 0);
1747                 if (err)
1748                         return err;
1749                 len -= l;
1750         }
1751         return 0;
1752 }
1753
1754 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1755 {
1756         if (mdev->tconn->agreed_pro_version >= 95)
1757                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1758                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1759                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1760                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1761         else
1762                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1763 }
1764
1765 /* Used to send write requests
1766  * R_PRIMARY -> Peer    (P_DATA)
1767  */
1768 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1769 {
1770         struct drbd_socket *sock;
1771         struct p_data *p;
1772         unsigned int dp_flags = 0;
1773         int dgs;
1774         int err;
1775
1776         sock = &mdev->tconn->data;
1777         p = drbd_prepare_command(mdev, sock);
1778         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1779                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1780
1781         if (!p)
1782                 return -EIO;
1783         p->sector = cpu_to_be64(req->i.sector);
1784         p->block_id = (unsigned long)req;
1785         p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1786         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1787         if (mdev->state.conn >= C_SYNC_SOURCE &&
1788             mdev->state.conn <= C_PAUSED_SYNC_T)
1789                 dp_flags |= DP_MAY_SET_IN_SYNC;
1790         if (mdev->tconn->agreed_pro_version >= 100) {
1791                 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1792                         dp_flags |= DP_SEND_RECEIVE_ACK;
1793                 if (req->rq_state & RQ_EXP_WRITE_ACK)
1794                         dp_flags |= DP_SEND_WRITE_ACK;
1795         }
1796         p->dp_flags = cpu_to_be32(dp_flags);
1797         if (dgs)
1798                 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1799         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1800         if (!err) {
1801                 /* For protocol A, we have to memcpy the payload into
1802                  * socket buffers, as we may complete right away
1803                  * as soon as we handed it over to tcp, at which point the data
1804                  * pages may become invalid.
1805                  *
1806                  * For data-integrity enabled, we copy it as well, so we can be
1807                  * sure that even if the bio pages may still be modified, it
1808                  * won't change the data on the wire, thus if the digest checks
1809                  * out ok after sending on this side, but does not fit on the
1810                  * receiving side, we sure have detected corruption elsewhere.
1811                  */
1812                 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1813                         err = _drbd_send_bio(mdev, req->master_bio);
1814                 else
1815                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1816
1817                 /* double check digest, sometimes buffers have been modified in flight. */
1818                 if (dgs > 0 && dgs <= 64) {
1819                         /* 64 byte, 512 bit, is the largest digest size
1820                          * currently supported in kernel crypto. */
1821                         unsigned char digest[64];
1822                         drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1823                         if (memcmp(p + 1, digest, dgs)) {
1824                                 dev_warn(DEV,
1825                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1826                                         (unsigned long long)req->i.sector, req->i.size);
1827                         }
1828                 } /* else if (dgs > 64) {
1829                      ... Be noisy about digest too large ...
1830                 } */
1831         }
1832         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1833
1834         return err;
1835 }
1836
1837 /* answer packet, used to send data back for read requests:
1838  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1839  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1840  */
1841 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1842                     struct drbd_peer_request *peer_req)
1843 {
1844         struct drbd_socket *sock;
1845         struct p_data *p;
1846         int err;
1847         int dgs;
1848
1849         sock = &mdev->tconn->data;
1850         p = drbd_prepare_command(mdev, sock);
1851
1852         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1853                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1854
1855         if (!p)
1856                 return -EIO;
1857         p->sector = cpu_to_be64(peer_req->i.sector);
1858         p->block_id = peer_req->block_id;
1859         p->seq_num = 0;  /* unused */
1860         if (dgs)
1861                 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1862         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1863         if (!err)
1864                 err = _drbd_send_zc_ee(mdev, peer_req);
1865         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1866
1867         return err;
1868 }
1869
1870 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1871 {
1872         struct drbd_socket *sock;
1873         struct p_block_desc *p;
1874
1875         sock = &mdev->tconn->data;
1876         p = drbd_prepare_command(mdev, sock);
1877         if (!p)
1878                 return -EIO;
1879         p->sector = cpu_to_be64(req->i.sector);
1880         p->blksize = cpu_to_be32(req->i.size);
1881         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1882 }
1883
1884 /*
1885   drbd_send distinguishes two cases:
1886
1887   Packets sent via the data socket "sock"
1888   and packets sent via the meta data socket "msock"
1889
1890                     sock                      msock
1891   -----------------+-------------------------+------------------------------
1892   timeout           conf.timeout / 2          conf.timeout / 2
1893   timeout action    send a ping via msock     Abort communication
1894                                               and close all sockets
1895 */
1896
1897 /*
1898  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1899  */
1900 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1901               void *buf, size_t size, unsigned msg_flags)
1902 {
1903         struct kvec iov;
1904         struct msghdr msg;
1905         int rv, sent = 0;
1906
1907         if (!sock)
1908                 return -EBADR;
1909
1910         /* THINK  if (signal_pending) return ... ? */
1911
1912         iov.iov_base = buf;
1913         iov.iov_len  = size;
1914
1915         msg.msg_name       = NULL;
1916         msg.msg_namelen    = 0;
1917         msg.msg_control    = NULL;
1918         msg.msg_controllen = 0;
1919         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1920
1921         if (sock == tconn->data.socket) {
1922                 rcu_read_lock();
1923                 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1924                 rcu_read_unlock();
1925                 drbd_update_congested(tconn);
1926         }
1927         do {
1928                 /* STRANGE
1929                  * tcp_sendmsg does _not_ use its size parameter at all ?
1930                  *
1931                  * -EAGAIN on timeout, -EINTR on signal.
1932                  */
1933 /* THINK
1934  * do we need to block DRBD_SIG if sock == &meta.socket ??
1935  * otherwise wake_asender() might interrupt some send_*Ack !
1936  */
1937                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1938                 if (rv == -EAGAIN) {
1939                         if (we_should_drop_the_connection(tconn, sock))
1940                                 break;
1941                         else
1942                                 continue;
1943                 }
1944                 if (rv == -EINTR) {
1945                         flush_signals(current);
1946                         rv = 0;
1947                 }
1948                 if (rv < 0)
1949                         break;
1950                 sent += rv;
1951                 iov.iov_base += rv;
1952                 iov.iov_len  -= rv;
1953         } while (sent < size);
1954
1955         if (sock == tconn->data.socket)
1956                 clear_bit(NET_CONGESTED, &tconn->flags);
1957
1958         if (rv <= 0) {
1959                 if (rv != -EAGAIN) {
1960                         conn_err(tconn, "%s_sendmsg returned %d\n",
1961                                  sock == tconn->meta.socket ? "msock" : "sock",
1962                                  rv);
1963                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1964                 } else
1965                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1966         }
1967
1968         return sent;
1969 }
1970
1971 /**
1972  * drbd_send_all  -  Send an entire buffer
1973  *
1974  * Returns 0 upon success and a negative error value otherwise.
1975  */
1976 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1977                   size_t size, unsigned msg_flags)
1978 {
1979         int err;
1980
1981         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1982         if (err < 0)
1983                 return err;
1984         if (err != size)
1985                 return -EIO;
1986         return 0;
1987 }
1988
1989 static int drbd_open(struct block_device *bdev, fmode_t mode)
1990 {
1991         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1992         unsigned long flags;
1993         int rv = 0;
1994
1995         mutex_lock(&drbd_main_mutex);
1996         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1997         /* to have a stable mdev->state.role
1998          * and no race with updating open_cnt */
1999
2000         if (mdev->state.role != R_PRIMARY) {
2001                 if (mode & FMODE_WRITE)
2002                         rv = -EROFS;
2003                 else if (!allow_oos)
2004                         rv = -EMEDIUMTYPE;
2005         }
2006
2007         if (!rv)
2008                 mdev->open_cnt++;
2009         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
2010         mutex_unlock(&drbd_main_mutex);
2011
2012         return rv;
2013 }
2014
2015 static int drbd_release(struct gendisk *gd, fmode_t mode)
2016 {
2017         struct drbd_conf *mdev = gd->private_data;
2018         mutex_lock(&drbd_main_mutex);
2019         mdev->open_cnt--;
2020         mutex_unlock(&drbd_main_mutex);
2021         return 0;
2022 }
2023
2024 static void drbd_set_defaults(struct drbd_conf *mdev)
2025 {
2026         /* Beware! The actual layout differs
2027          * between big endian and little endian */
2028         mdev->state = (union drbd_dev_state) {
2029                 { .role = R_SECONDARY,
2030                   .peer = R_UNKNOWN,
2031                   .conn = C_STANDALONE,
2032                   .disk = D_DISKLESS,
2033                   .pdsk = D_UNKNOWN,
2034                 } };
2035 }
2036
2037 void drbd_init_set_defaults(struct drbd_conf *mdev)
2038 {
2039         /* the memset(,0,) did most of this.
2040          * note: only assignments, no allocation in here */
2041
2042         drbd_set_defaults(mdev);
2043
2044         atomic_set(&mdev->ap_bio_cnt, 0);
2045         atomic_set(&mdev->ap_pending_cnt, 0);
2046         atomic_set(&mdev->rs_pending_cnt, 0);
2047         atomic_set(&mdev->unacked_cnt, 0);
2048         atomic_set(&mdev->local_cnt, 0);
2049         atomic_set(&mdev->pp_in_use_by_net, 0);
2050         atomic_set(&mdev->rs_sect_in, 0);
2051         atomic_set(&mdev->rs_sect_ev, 0);
2052         atomic_set(&mdev->ap_in_flight, 0);
2053         atomic_set(&mdev->md_io_in_use, 0);
2054
2055         mutex_init(&mdev->own_state_mutex);
2056         mdev->state_mutex = &mdev->own_state_mutex;
2057
2058         spin_lock_init(&mdev->al_lock);
2059         spin_lock_init(&mdev->peer_seq_lock);
2060         spin_lock_init(&mdev->epoch_lock);
2061
2062         INIT_LIST_HEAD(&mdev->active_ee);
2063         INIT_LIST_HEAD(&mdev->sync_ee);
2064         INIT_LIST_HEAD(&mdev->done_ee);
2065         INIT_LIST_HEAD(&mdev->read_ee);
2066         INIT_LIST_HEAD(&mdev->net_ee);
2067         INIT_LIST_HEAD(&mdev->resync_reads);
2068         INIT_LIST_HEAD(&mdev->resync_work.list);
2069         INIT_LIST_HEAD(&mdev->unplug_work.list);
2070         INIT_LIST_HEAD(&mdev->go_diskless.list);
2071         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2072         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2073         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2074
2075         mdev->resync_work.cb  = w_resync_timer;
2076         mdev->unplug_work.cb  = w_send_write_hint;
2077         mdev->go_diskless.cb  = w_go_diskless;
2078         mdev->md_sync_work.cb = w_md_sync;
2079         mdev->bm_io_work.w.cb = w_bitmap_io;
2080         mdev->start_resync_work.cb = w_start_resync;
2081
2082         mdev->resync_work.mdev  = mdev;
2083         mdev->unplug_work.mdev  = mdev;
2084         mdev->go_diskless.mdev  = mdev;
2085         mdev->md_sync_work.mdev = mdev;
2086         mdev->bm_io_work.w.mdev = mdev;
2087         mdev->start_resync_work.mdev = mdev;
2088
2089         init_timer(&mdev->resync_timer);
2090         init_timer(&mdev->md_sync_timer);
2091         init_timer(&mdev->start_resync_timer);
2092         init_timer(&mdev->request_timer);
2093         mdev->resync_timer.function = resync_timer_fn;
2094         mdev->resync_timer.data = (unsigned long) mdev;
2095         mdev->md_sync_timer.function = md_sync_timer_fn;
2096         mdev->md_sync_timer.data = (unsigned long) mdev;
2097         mdev->start_resync_timer.function = start_resync_timer_fn;
2098         mdev->start_resync_timer.data = (unsigned long) mdev;
2099         mdev->request_timer.function = request_timer_fn;
2100         mdev->request_timer.data = (unsigned long) mdev;
2101
2102         init_waitqueue_head(&mdev->misc_wait);
2103         init_waitqueue_head(&mdev->state_wait);
2104         init_waitqueue_head(&mdev->ee_wait);
2105         init_waitqueue_head(&mdev->al_wait);
2106         init_waitqueue_head(&mdev->seq_wait);
2107
2108         mdev->write_ordering = WO_bdev_flush;
2109         mdev->resync_wenr = LC_FREE;
2110         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2111         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2112 }
2113
2114 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2115 {
2116         int i;
2117         if (mdev->tconn->receiver.t_state != NONE)
2118                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2119                                 mdev->tconn->receiver.t_state);
2120
2121         /* no need to lock it, I'm the only thread alive */
2122         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2123                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2124         mdev->al_writ_cnt  =
2125         mdev->bm_writ_cnt  =
2126         mdev->read_cnt     =
2127         mdev->recv_cnt     =
2128         mdev->send_cnt     =
2129         mdev->writ_cnt     =
2130         mdev->p_size       =
2131         mdev->rs_start     =
2132         mdev->rs_total     =
2133         mdev->rs_failed    = 0;
2134         mdev->rs_last_events = 0;
2135         mdev->rs_last_sect_ev = 0;
2136         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2137                 mdev->rs_mark_left[i] = 0;
2138                 mdev->rs_mark_time[i] = 0;
2139         }
2140         D_ASSERT(mdev->tconn->net_conf == NULL);
2141
2142         drbd_set_my_capacity(mdev, 0);
2143         if (mdev->bitmap) {
2144                 /* maybe never allocated. */
2145                 drbd_bm_resize(mdev, 0, 1);
2146                 drbd_bm_cleanup(mdev);
2147         }
2148
2149         drbd_free_bc(mdev->ldev);
2150         mdev->ldev = NULL;
2151
2152         clear_bit(AL_SUSPENDED, &mdev->flags);
2153
2154         D_ASSERT(list_empty(&mdev->active_ee));
2155         D_ASSERT(list_empty(&mdev->sync_ee));
2156         D_ASSERT(list_empty(&mdev->done_ee));
2157         D_ASSERT(list_empty(&mdev->read_ee));
2158         D_ASSERT(list_empty(&mdev->net_ee));
2159         D_ASSERT(list_empty(&mdev->resync_reads));
2160         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2161         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2162         D_ASSERT(list_empty(&mdev->resync_work.list));
2163         D_ASSERT(list_empty(&mdev->unplug_work.list));
2164         D_ASSERT(list_empty(&mdev->go_diskless.list));
2165
2166         drbd_set_defaults(mdev);
2167 }
2168
2169
2170 static void drbd_destroy_mempools(void)
2171 {
2172         struct page *page;
2173
2174         while (drbd_pp_pool) {
2175                 page = drbd_pp_pool;
2176                 drbd_pp_pool = (struct page *)page_private(page);
2177                 __free_page(page);
2178                 drbd_pp_vacant--;
2179         }
2180
2181         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2182
2183         if (drbd_md_io_bio_set)
2184                 bioset_free(drbd_md_io_bio_set);
2185         if (drbd_md_io_page_pool)
2186                 mempool_destroy(drbd_md_io_page_pool);
2187         if (drbd_ee_mempool)
2188                 mempool_destroy(drbd_ee_mempool);
2189         if (drbd_request_mempool)
2190                 mempool_destroy(drbd_request_mempool);
2191         if (drbd_ee_cache)
2192                 kmem_cache_destroy(drbd_ee_cache);
2193         if (drbd_request_cache)
2194                 kmem_cache_destroy(drbd_request_cache);
2195         if (drbd_bm_ext_cache)
2196                 kmem_cache_destroy(drbd_bm_ext_cache);
2197         if (drbd_al_ext_cache)
2198                 kmem_cache_destroy(drbd_al_ext_cache);
2199
2200         drbd_md_io_bio_set   = NULL;
2201         drbd_md_io_page_pool = NULL;
2202         drbd_ee_mempool      = NULL;
2203         drbd_request_mempool = NULL;
2204         drbd_ee_cache        = NULL;
2205         drbd_request_cache   = NULL;
2206         drbd_bm_ext_cache    = NULL;
2207         drbd_al_ext_cache    = NULL;
2208
2209         return;
2210 }
2211
2212 static int drbd_create_mempools(void)
2213 {
2214         struct page *page;
2215         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2216         int i;
2217
2218         /* prepare our caches and mempools */
2219         drbd_request_mempool = NULL;
2220         drbd_ee_cache        = NULL;
2221         drbd_request_cache   = NULL;
2222         drbd_bm_ext_cache    = NULL;
2223         drbd_al_ext_cache    = NULL;
2224         drbd_pp_pool         = NULL;
2225         drbd_md_io_page_pool = NULL;
2226         drbd_md_io_bio_set   = NULL;
2227
2228         /* caches */
2229         drbd_request_cache = kmem_cache_create(
2230                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2231         if (drbd_request_cache == NULL)
2232                 goto Enomem;
2233
2234         drbd_ee_cache = kmem_cache_create(
2235                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2236         if (drbd_ee_cache == NULL)
2237                 goto Enomem;
2238
2239         drbd_bm_ext_cache = kmem_cache_create(
2240                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2241         if (drbd_bm_ext_cache == NULL)
2242                 goto Enomem;
2243
2244         drbd_al_ext_cache = kmem_cache_create(
2245                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2246         if (drbd_al_ext_cache == NULL)
2247                 goto Enomem;
2248
2249         /* mempools */
2250         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2251         if (drbd_md_io_bio_set == NULL)
2252                 goto Enomem;
2253
2254         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2255         if (drbd_md_io_page_pool == NULL)
2256                 goto Enomem;
2257
2258         drbd_request_mempool = mempool_create(number,
2259                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2260         if (drbd_request_mempool == NULL)
2261                 goto Enomem;
2262
2263         drbd_ee_mempool = mempool_create(number,
2264                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2265         if (drbd_ee_mempool == NULL)
2266                 goto Enomem;
2267
2268         /* drbd's page pool */
2269         spin_lock_init(&drbd_pp_lock);
2270
2271         for (i = 0; i < number; i++) {
2272                 page = alloc_page(GFP_HIGHUSER);
2273                 if (!page)
2274                         goto Enomem;
2275                 set_page_private(page, (unsigned long)drbd_pp_pool);
2276                 drbd_pp_pool = page;
2277         }
2278         drbd_pp_vacant = number;
2279
2280         return 0;
2281
2282 Enomem:
2283         drbd_destroy_mempools(); /* in case we allocated some */
2284         return -ENOMEM;
2285 }
2286
2287 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2288         void *unused)
2289 {
2290         /* just so we have it.  you never know what interesting things we
2291          * might want to do here some day...
2292          */
2293
2294         return NOTIFY_DONE;
2295 }
2296
2297 static struct notifier_block drbd_notifier = {
2298         .notifier_call = drbd_notify_sys,
2299 };
2300
2301 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2302 {
2303         int rr;
2304
2305         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2306         if (rr)
2307                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2308
2309         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2310         if (rr)
2311                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2312
2313         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2314         if (rr)
2315                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2316
2317         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2318         if (rr)
2319                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2320
2321         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2322         if (rr)
2323                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2324 }
2325
2326 /* caution. no locking. */
2327 void drbd_minor_destroy(struct kref *kref)
2328 {
2329         struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2330         struct drbd_tconn *tconn = mdev->tconn;
2331
2332         del_timer_sync(&mdev->request_timer);
2333
2334         /* paranoia asserts */
2335         D_ASSERT(mdev->open_cnt == 0);
2336         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2337         /* end paranoia asserts */
2338
2339         /* cleanup stuff that may have been allocated during
2340          * device (re-)configuration or state changes */
2341
2342         if (mdev->this_bdev)
2343                 bdput(mdev->this_bdev);
2344
2345         drbd_free_bc(mdev->ldev);
2346         mdev->ldev = NULL;
2347
2348         drbd_release_all_peer_reqs(mdev);
2349
2350         lc_destroy(mdev->act_log);
2351         lc_destroy(mdev->resync);
2352
2353         kfree(mdev->p_uuid);
2354         /* mdev->p_uuid = NULL; */
2355
2356         kfree(mdev->current_epoch);
2357         if (mdev->bitmap) /* should no longer be there. */
2358                 drbd_bm_cleanup(mdev);
2359         __free_page(mdev->md_io_page);
2360         put_disk(mdev->vdisk);
2361         blk_cleanup_queue(mdev->rq_queue);
2362         kfree(mdev->rs_plan_s);
2363         kfree(mdev);
2364
2365         kref_put(&tconn->kref, &conn_destroy);
2366 }
2367
2368 static void drbd_cleanup(void)
2369 {
2370         unsigned int i;
2371         struct drbd_conf *mdev;
2372         struct drbd_tconn *tconn, *tmp;
2373
2374         unregister_reboot_notifier(&drbd_notifier);
2375
2376         /* first remove proc,
2377          * drbdsetup uses it's presence to detect
2378          * whether DRBD is loaded.
2379          * If we would get stuck in proc removal,
2380          * but have netlink already deregistered,
2381          * some drbdsetup commands may wait forever
2382          * for an answer.
2383          */
2384         if (drbd_proc)
2385                 remove_proc_entry("drbd", NULL);
2386
2387         drbd_genl_unregister();
2388
2389         idr_for_each_entry(&minors, mdev, i) {
2390                 idr_remove(&minors, mdev_to_minor(mdev));
2391                 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2392                 del_gendisk(mdev->vdisk);
2393                 /* synchronize_rcu(); No other threads running at this point */
2394                 kref_put(&mdev->kref, &drbd_minor_destroy);
2395         }
2396
2397         /* not _rcu since, no other updater anymore. Genl already unregistered */
2398         list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2399                 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2400                 /* synchronize_rcu(); */
2401                 kref_put(&tconn->kref, &conn_destroy);
2402         }
2403
2404         drbd_destroy_mempools();
2405         unregister_blkdev(DRBD_MAJOR, "drbd");
2406
2407         idr_destroy(&minors);
2408
2409         printk(KERN_INFO "drbd: module cleanup done.\n");
2410 }
2411
2412 /**
2413  * drbd_congested() - Callback for pdflush
2414  * @congested_data:     User data
2415  * @bdi_bits:           Bits pdflush is currently interested in
2416  *
2417  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2418  */
2419 static int drbd_congested(void *congested_data, int bdi_bits)
2420 {
2421         struct drbd_conf *mdev = congested_data;
2422         struct request_queue *q;
2423         char reason = '-';
2424         int r = 0;
2425
2426         if (!may_inc_ap_bio(mdev)) {
2427                 /* DRBD has frozen IO */
2428                 r = bdi_bits;
2429                 reason = 'd';
2430                 goto out;
2431         }
2432
2433         if (get_ldev(mdev)) {
2434                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2435                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2436                 put_ldev(mdev);
2437                 if (r)
2438                         reason = 'b';
2439         }
2440
2441         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2442                 r |= (1 << BDI_async_congested);
2443                 reason = reason == 'b' ? 'a' : 'n';
2444         }
2445
2446 out:
2447         mdev->congestion_reason = reason;
2448         return r;
2449 }
2450
2451 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2452 {
2453         sema_init(&wq->s, 0);
2454         spin_lock_init(&wq->q_lock);
2455         INIT_LIST_HEAD(&wq->q);
2456 }
2457
2458 struct drbd_tconn *conn_get_by_name(const char *name)
2459 {
2460         struct drbd_tconn *tconn;
2461
2462         if (!name || !name[0])
2463                 return NULL;
2464
2465         rcu_read_lock();
2466         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2467                 if (!strcmp(tconn->name, name)) {
2468                         kref_get(&tconn->kref);
2469                         goto found;
2470                 }
2471         }
2472         tconn = NULL;
2473 found:
2474         rcu_read_unlock();
2475         return tconn;
2476 }
2477
2478 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2479                                      void *peer_addr, int peer_addr_len)
2480 {
2481         struct drbd_tconn *tconn;
2482
2483         rcu_read_lock();
2484         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2485                 if (tconn->my_addr_len == my_addr_len &&
2486                     tconn->peer_addr_len == peer_addr_len &&
2487                     !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2488                     !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2489                         kref_get(&tconn->kref);
2490                         goto found;
2491                 }
2492         }
2493         tconn = NULL;
2494 found:
2495         rcu_read_unlock();
2496         return tconn;
2497 }
2498
2499 static int drbd_alloc_socket(struct drbd_socket *socket)
2500 {
2501         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2502         if (!socket->rbuf)
2503                 return -ENOMEM;
2504         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2505         if (!socket->sbuf)
2506                 return -ENOMEM;
2507         return 0;
2508 }
2509
2510 static void drbd_free_socket(struct drbd_socket *socket)
2511 {
2512         free_page((unsigned long) socket->sbuf);
2513         free_page((unsigned long) socket->rbuf);
2514 }
2515
2516 void conn_free_crypto(struct drbd_tconn *tconn)
2517 {
2518         drbd_free_sock(tconn);
2519
2520         crypto_free_hash(tconn->csums_tfm);
2521         crypto_free_hash(tconn->verify_tfm);
2522         crypto_free_hash(tconn->cram_hmac_tfm);
2523         crypto_free_hash(tconn->integrity_tfm);
2524         crypto_free_hash(tconn->peer_integrity_tfm);
2525         kfree(tconn->int_dig_in);
2526         kfree(tconn->int_dig_vv);
2527
2528         tconn->csums_tfm = NULL;
2529         tconn->verify_tfm = NULL;
2530         tconn->cram_hmac_tfm = NULL;
2531         tconn->integrity_tfm = NULL;
2532         tconn->peer_integrity_tfm = NULL;
2533         tconn->int_dig_in = NULL;
2534         tconn->int_dig_vv = NULL;
2535 }
2536
2537 int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
2538 {
2539         cpumask_var_t new_cpu_mask;
2540         int err;
2541
2542         if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2543                 return -ENOMEM;
2544                 /*
2545                 retcode = ERR_NOMEM;
2546                 drbd_msg_put_info("unable to allocate cpumask");
2547                 */
2548
2549         /* silently ignore cpu mask on UP kernel */
2550         if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2551                 /* FIXME: Get rid of constant 32 here */
2552                 err = __bitmap_parse(res_opts->cpu_mask, 32, 0,
2553                                 cpumask_bits(new_cpu_mask), nr_cpu_ids);
2554                 if (err) {
2555                         conn_warn(tconn, "__bitmap_parse() failed with %d\n", err);
2556                         /* retcode = ERR_CPU_MASK_PARSE; */
2557                         goto fail;
2558                 }
2559         }
2560         tconn->res_opts = *res_opts;
2561         if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
2562                 cpumask_copy(tconn->cpu_mask, new_cpu_mask);
2563                 drbd_calc_cpu_mask(tconn);
2564                 tconn->receiver.reset_cpu_mask = 1;
2565                 tconn->asender.reset_cpu_mask = 1;
2566                 tconn->worker.reset_cpu_mask = 1;
2567         }
2568         err = 0;
2569
2570 fail:
2571         free_cpumask_var(new_cpu_mask);
2572         return err;
2573
2574 }
2575
2576 /* caller must be under genl_lock() */
2577 struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2578 {
2579         struct drbd_tconn *tconn;
2580
2581         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2582         if (!tconn)
2583                 return NULL;
2584
2585         tconn->name = kstrdup(name, GFP_KERNEL);
2586         if (!tconn->name)
2587                 goto fail;
2588
2589         if (drbd_alloc_socket(&tconn->data))
2590                 goto fail;
2591         if (drbd_alloc_socket(&tconn->meta))
2592                 goto fail;
2593
2594         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2595                 goto fail;
2596
2597         if (set_resource_options(tconn, res_opts))
2598                 goto fail;
2599
2600         if (!tl_init(tconn))
2601                 goto fail;
2602
2603         tconn->cstate = C_STANDALONE;
2604         mutex_init(&tconn->cstate_mutex);
2605         spin_lock_init(&tconn->req_lock);
2606         mutex_init(&tconn->conf_update);
2607         init_waitqueue_head(&tconn->ping_wait);
2608         idr_init(&tconn->volumes);
2609
2610         drbd_init_workqueue(&tconn->data.work);
2611         mutex_init(&tconn->data.mutex);
2612
2613         drbd_init_workqueue(&tconn->meta.work);
2614         mutex_init(&tconn->meta.mutex);
2615
2616         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2617         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2618         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2619
2620         kref_init(&tconn->kref);
2621         list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2622
2623         return tconn;
2624
2625 fail:
2626         tl_cleanup(tconn);
2627         free_cpumask_var(tconn->cpu_mask);
2628         drbd_free_socket(&tconn->meta);
2629         drbd_free_socket(&tconn->data);
2630         kfree(tconn->name);
2631         kfree(tconn);
2632
2633         return NULL;
2634 }
2635
2636 void conn_destroy(struct kref *kref)
2637 {
2638         struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2639
2640         idr_destroy(&tconn->volumes);
2641
2642         free_cpumask_var(tconn->cpu_mask);
2643         drbd_free_socket(&tconn->meta);
2644         drbd_free_socket(&tconn->data);
2645         kfree(tconn->name);
2646         kfree(tconn->int_dig_in);
2647         kfree(tconn->int_dig_vv);
2648         kfree(tconn);
2649 }
2650
2651 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2652 {
2653         struct drbd_conf *mdev;
2654         struct gendisk *disk;
2655         struct request_queue *q;
2656         int vnr_got = vnr;
2657         int minor_got = minor;
2658         enum drbd_ret_code err = ERR_NOMEM;
2659
2660         mdev = minor_to_mdev(minor);
2661         if (mdev)
2662                 return ERR_MINOR_EXISTS;
2663
2664         /* GFP_KERNEL, we are outside of all write-out paths */
2665         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2666         if (!mdev)
2667                 return ERR_NOMEM;
2668
2669         kref_get(&tconn->kref);
2670         mdev->tconn = tconn;
2671
2672         mdev->minor = minor;
2673         mdev->vnr = vnr;
2674
2675         drbd_init_set_defaults(mdev);
2676
2677         q = blk_alloc_queue(GFP_KERNEL);
2678         if (!q)
2679                 goto out_no_q;
2680         mdev->rq_queue = q;
2681         q->queuedata   = mdev;
2682
2683         disk = alloc_disk(1);
2684         if (!disk)
2685                 goto out_no_disk;
2686         mdev->vdisk = disk;
2687
2688         set_disk_ro(disk, true);
2689
2690         disk->queue = q;
2691         disk->major = DRBD_MAJOR;
2692         disk->first_minor = minor;
2693         disk->fops = &drbd_ops;
2694         sprintf(disk->disk_name, "drbd%d", minor);
2695         disk->private_data = mdev;
2696
2697         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2698         /* we have no partitions. we contain only ourselves. */
2699         mdev->this_bdev->bd_contains = mdev->this_bdev;
2700
2701         q->backing_dev_info.congested_fn = drbd_congested;
2702         q->backing_dev_info.congested_data = mdev;
2703
2704         blk_queue_make_request(q, drbd_make_request);
2705         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2706            This triggers a max_bio_size message upon first attach or connect */
2707         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2708         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2709         blk_queue_merge_bvec(q, drbd_merge_bvec);
2710         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2711
2712         mdev->md_io_page = alloc_page(GFP_KERNEL);
2713         if (!mdev->md_io_page)
2714                 goto out_no_io_page;
2715
2716         if (drbd_bm_init(mdev))
2717                 goto out_no_bitmap;
2718         mdev->read_requests = RB_ROOT;
2719         mdev->write_requests = RB_ROOT;
2720
2721         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2722         if (!mdev->current_epoch)
2723                 goto out_no_epoch;
2724
2725         INIT_LIST_HEAD(&mdev->current_epoch->list);
2726         mdev->epochs = 1;
2727
2728         if (!idr_pre_get(&minors, GFP_KERNEL))
2729                 goto out_no_minor_idr;
2730         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2731                 goto out_no_minor_idr;
2732         if (minor_got != minor) {
2733                 err = ERR_MINOR_EXISTS;
2734                 drbd_msg_put_info("requested minor exists already");
2735                 goto out_idr_remove_minor;
2736         }
2737
2738         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2739                 goto out_idr_remove_minor;
2740         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2741                 goto out_idr_remove_minor;
2742         if (vnr_got != vnr) {
2743                 err = ERR_INVALID_REQUEST;
2744                 drbd_msg_put_info("requested volume exists already");
2745                 goto out_idr_remove_vol;
2746         }
2747         add_disk(disk);
2748         kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2749
2750         /* inherit the connection state */
2751         mdev->state.conn = tconn->cstate;
2752         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2753                 drbd_connected(mdev);
2754
2755         return NO_ERROR;
2756
2757 out_idr_remove_vol:
2758         idr_remove(&tconn->volumes, vnr_got);
2759 out_idr_remove_minor:
2760         idr_remove(&minors, minor_got);
2761         synchronize_rcu();
2762 out_no_minor_idr:
2763         kfree(mdev->current_epoch);
2764 out_no_epoch:
2765         drbd_bm_cleanup(mdev);
2766 out_no_bitmap:
2767         __free_page(mdev->md_io_page);
2768 out_no_io_page:
2769         put_disk(disk);
2770 out_no_disk:
2771         blk_cleanup_queue(q);
2772 out_no_q:
2773         kfree(mdev);
2774         kref_put(&tconn->kref, &conn_destroy);
2775         return err;
2776 }
2777
2778 int __init drbd_init(void)
2779 {
2780         int err;
2781
2782         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2783                 printk(KERN_ERR
2784                        "drbd: invalid minor_count (%d)\n", minor_count);
2785 #ifdef MODULE
2786                 return -EINVAL;
2787 #else
2788                 minor_count = DRBD_MINOR_COUNT_DEF;
2789 #endif
2790         }
2791
2792         err = register_blkdev(DRBD_MAJOR, "drbd");
2793         if (err) {
2794                 printk(KERN_ERR
2795                        "drbd: unable to register block device major %d\n",
2796                        DRBD_MAJOR);
2797                 return err;
2798         }
2799
2800         err = drbd_genl_register();
2801         if (err) {
2802                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2803                 goto fail;
2804         }
2805
2806
2807         register_reboot_notifier(&drbd_notifier);
2808
2809         /*
2810          * allocate all necessary structs
2811          */
2812         err = -ENOMEM;
2813
2814         init_waitqueue_head(&drbd_pp_wait);
2815
2816         drbd_proc = NULL; /* play safe for drbd_cleanup */
2817         idr_init(&minors);
2818
2819         err = drbd_create_mempools();
2820         if (err)
2821                 goto fail;
2822
2823         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2824         if (!drbd_proc) {
2825                 printk(KERN_ERR "drbd: unable to register proc file\n");
2826                 goto fail;
2827         }
2828
2829         rwlock_init(&global_state_lock);
2830         INIT_LIST_HEAD(&drbd_tconns);
2831
2832         printk(KERN_INFO "drbd: initialized. "
2833                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2834                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2835         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2836         printk(KERN_INFO "drbd: registered as block device major %d\n",
2837                 DRBD_MAJOR);
2838
2839         return 0; /* Success! */
2840
2841 fail:
2842         drbd_cleanup();
2843         if (err == -ENOMEM)
2844                 /* currently always the case */
2845                 printk(KERN_ERR "drbd: ran out of memory\n");
2846         else
2847                 printk(KERN_ERR "drbd: initialization failure\n");
2848         return err;
2849 }
2850
2851 void drbd_free_bc(struct drbd_backing_dev *ldev)
2852 {
2853         if (ldev == NULL)
2854                 return;
2855
2856         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2857         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2858
2859         kfree(ldev);
2860 }
2861
2862 void drbd_free_sock(struct drbd_tconn *tconn)
2863 {
2864         if (tconn->data.socket) {
2865                 mutex_lock(&tconn->data.mutex);
2866                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2867                 sock_release(tconn->data.socket);
2868                 tconn->data.socket = NULL;
2869                 mutex_unlock(&tconn->data.mutex);
2870         }
2871         if (tconn->meta.socket) {
2872                 mutex_lock(&tconn->meta.mutex);
2873                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2874                 sock_release(tconn->meta.socket);
2875                 tconn->meta.socket = NULL;
2876                 mutex_unlock(&tconn->meta.mutex);
2877         }
2878 }
2879
2880 /* meta data management */
2881
2882 struct meta_data_on_disk {
2883         u64 la_size;           /* last agreed size. */
2884         u64 uuid[UI_SIZE];   /* UUIDs. */
2885         u64 device_uuid;
2886         u64 reserved_u64_1;
2887         u32 flags;             /* MDF */
2888         u32 magic;
2889         u32 md_size_sect;
2890         u32 al_offset;         /* offset to this block */
2891         u32 al_nr_extents;     /* important for restoring the AL */
2892               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2893         u32 bm_offset;         /* offset to the bitmap, from here */
2894         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2895         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2896         u32 reserved_u32[3];
2897
2898 } __packed;
2899
2900 /**
2901  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2902  * @mdev:       DRBD device.
2903  */
2904 void drbd_md_sync(struct drbd_conf *mdev)
2905 {
2906         struct meta_data_on_disk *buffer;
2907         sector_t sector;
2908         int i;
2909
2910         del_timer(&mdev->md_sync_timer);
2911         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2912         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2913                 return;
2914
2915         /* We use here D_FAILED and not D_ATTACHING because we try to write
2916          * metadata even if we detach due to a disk failure! */
2917         if (!get_ldev_if_state(mdev, D_FAILED))
2918                 return;
2919
2920         buffer = drbd_md_get_buffer(mdev);
2921         if (!buffer)
2922                 goto out;
2923
2924         memset(buffer, 0, 512);
2925
2926         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2927         for (i = UI_CURRENT; i < UI_SIZE; i++)
2928                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2929         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2930         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
2931
2932         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2933         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2934         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2935         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2936         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2937
2938         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2939         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2940
2941         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2942         sector = mdev->ldev->md.md_offset;
2943
2944         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2945                 /* this was a try anyways ... */
2946                 dev_err(DEV, "meta data update failed!\n");
2947                 drbd_chk_io_error(mdev, 1, true);
2948         }
2949
2950         /* Update mdev->ldev->md.la_size_sect,
2951          * since we updated it on metadata. */
2952         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2953
2954         drbd_md_put_buffer(mdev);
2955 out:
2956         put_ldev(mdev);
2957 }
2958
2959 /**
2960  * drbd_md_read() - Reads in the meta data super block
2961  * @mdev:       DRBD device.
2962  * @bdev:       Device from which the meta data should be read in.
2963  *
2964  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2965  * something goes wrong.
2966  */
2967 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2968 {
2969         struct meta_data_on_disk *buffer;
2970         u32 magic, flags;
2971         int i, rv = NO_ERROR;
2972
2973         if (!get_ldev_if_state(mdev, D_ATTACHING))
2974                 return ERR_IO_MD_DISK;
2975
2976         buffer = drbd_md_get_buffer(mdev);
2977         if (!buffer)
2978                 goto out;
2979
2980         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2981                 /* NOTE: can't do normal error processing here as this is
2982                    called BEFORE disk is attached */
2983                 dev_err(DEV, "Error while reading metadata.\n");
2984                 rv = ERR_IO_MD_DISK;
2985                 goto err;
2986         }
2987
2988         magic = be32_to_cpu(buffer->magic);
2989         flags = be32_to_cpu(buffer->flags);
2990         if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
2991             (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
2992                         /* btw: that's Activity Log clean, not "all" clean. */
2993                 dev_err(DEV, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
2994                 rv = ERR_MD_UNCLEAN;
2995                 goto err;
2996         }
2997         if (magic != DRBD_MD_MAGIC_08) {
2998                 if (magic == DRBD_MD_MAGIC_07) 
2999                         dev_err(DEV, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
3000                 else
3001                         dev_err(DEV, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
3002                 rv = ERR_MD_INVALID;
3003                 goto err;
3004         }
3005         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3006                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3007                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3008                 rv = ERR_MD_INVALID;
3009                 goto err;
3010         }
3011         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3012                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3013                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3014                 rv = ERR_MD_INVALID;
3015                 goto err;
3016         }
3017         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3018                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3019                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3020                 rv = ERR_MD_INVALID;
3021                 goto err;
3022         }
3023
3024         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3025                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3026                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3027                 rv = ERR_MD_INVALID;
3028                 goto err;
3029         }
3030
3031         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3032         for (i = UI_CURRENT; i < UI_SIZE; i++)
3033                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3034         bdev->md.flags = be32_to_cpu(buffer->flags);
3035         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3036
3037         spin_lock_irq(&mdev->tconn->req_lock);
3038         if (mdev->state.conn < C_CONNECTED) {
3039                 int peer;
3040                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3041                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3042                 mdev->peer_max_bio_size = peer;
3043         }
3044         spin_unlock_irq(&mdev->tconn->req_lock);
3045
3046  err:
3047         drbd_md_put_buffer(mdev);
3048  out:
3049         put_ldev(mdev);
3050
3051         return rv;
3052 }
3053
3054 /**
3055  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3056  * @mdev:       DRBD device.
3057  *
3058  * Call this function if you change anything that should be written to
3059  * the meta-data super block. This function sets MD_DIRTY, and starts a
3060  * timer that ensures that within five seconds you have to call drbd_md_sync().
3061  */
3062 #ifdef DEBUG
3063 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3064 {
3065         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3066                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3067                 mdev->last_md_mark_dirty.line = line;
3068                 mdev->last_md_mark_dirty.func = func;
3069         }
3070 }
3071 #else
3072 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3073 {
3074         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3075                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3076 }
3077 #endif
3078
3079 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3080 {
3081         int i;
3082
3083         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3084                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3085 }
3086
3087 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3088 {
3089         if (idx == UI_CURRENT) {
3090                 if (mdev->state.role == R_PRIMARY)
3091                         val |= 1;
3092                 else
3093                         val &= ~((u64)1);
3094
3095                 drbd_set_ed_uuid(mdev, val);
3096         }
3097
3098         mdev->ldev->md.uuid[idx] = val;
3099         drbd_md_mark_dirty(mdev);
3100 }
3101
3102
3103 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3104 {
3105         if (mdev->ldev->md.uuid[idx]) {
3106                 drbd_uuid_move_history(mdev);
3107                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3108         }
3109         _drbd_uuid_set(mdev, idx, val);
3110 }
3111
3112 /**
3113  * drbd_uuid_new_current() - Creates a new current UUID
3114  * @mdev:       DRBD device.
3115  *
3116  * Creates a new current UUID, and rotates the old current UUID into
3117  * the bitmap slot. Causes an incremental resync upon next connect.
3118  */
3119 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3120 {
3121         u64 val;
3122         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3123
3124         if (bm_uuid)
3125                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3126
3127         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3128
3129         get_random_bytes(&val, sizeof(u64));
3130         _drbd_uuid_set(mdev, UI_CURRENT, val);
3131         drbd_print_uuids(mdev, "new current UUID");
3132         /* get it to stable storage _now_ */
3133         drbd_md_sync(mdev);
3134 }
3135
3136 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3137 {
3138         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3139                 return;
3140
3141         if (val == 0) {
3142                 drbd_uuid_move_history(mdev);
3143                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3144                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3145         } else {
3146                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3147                 if (bm_uuid)
3148                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3149
3150                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3151         }
3152         drbd_md_mark_dirty(mdev);
3153 }
3154
3155 /**
3156  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3157  * @mdev:       DRBD device.
3158  *
3159  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3160  */
3161 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3162 {
3163         int rv = -EIO;
3164
3165         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3166                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3167                 drbd_md_sync(mdev);
3168                 drbd_bm_set_all(mdev);
3169
3170                 rv = drbd_bm_write(mdev);
3171
3172                 if (!rv) {
3173                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3174                         drbd_md_sync(mdev);
3175                 }
3176
3177                 put_ldev(mdev);
3178         }
3179
3180         return rv;
3181 }
3182
3183 /**
3184  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3185  * @mdev:       DRBD device.
3186  *
3187  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3188  */
3189 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3190 {
3191         int rv = -EIO;
3192
3193         drbd_resume_al(mdev);
3194         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3195                 drbd_bm_clear_all(mdev);
3196                 rv = drbd_bm_write(mdev);
3197                 put_ldev(mdev);
3198         }
3199
3200         return rv;
3201 }
3202
3203 static int w_bitmap_io(struct drbd_work *w, int unused)
3204 {
3205         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3206         struct drbd_conf *mdev = w->mdev;
3207         int rv = -EIO;
3208
3209         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3210
3211         if (get_ldev(mdev)) {
3212                 drbd_bm_lock(mdev, work->why, work->flags);
3213                 rv = work->io_fn(mdev);
3214                 drbd_bm_unlock(mdev);
3215                 put_ldev(mdev);
3216         }
3217
3218         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3219         wake_up(&mdev->misc_wait);
3220
3221         if (work->done)
3222                 work->done(mdev, rv);
3223
3224         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3225         work->why = NULL;
3226         work->flags = 0;
3227
3228         return 0;
3229 }
3230
3231 void drbd_ldev_destroy(struct drbd_conf *mdev)
3232 {
3233         lc_destroy(mdev->resync);
3234         mdev->resync = NULL;
3235         lc_destroy(mdev->act_log);
3236         mdev->act_log = NULL;
3237         __no_warn(local,
3238                 drbd_free_bc(mdev->ldev);
3239                 mdev->ldev = NULL;);
3240
3241         clear_bit(GO_DISKLESS, &mdev->flags);
3242 }
3243
3244 static int w_go_diskless(struct drbd_work *w, int unused)
3245 {
3246         struct drbd_conf *mdev = w->mdev;
3247
3248         D_ASSERT(mdev->state.disk == D_FAILED);
3249         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3250          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3251          * the protected members anymore, though, so once put_ldev reaches zero
3252          * again, it will be safe to free them. */
3253         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3254         return 0;
3255 }
3256
3257 void drbd_go_diskless(struct drbd_conf *mdev)
3258 {
3259         D_ASSERT(mdev->state.disk == D_FAILED);
3260         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3261                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3262 }
3263
3264 /**
3265  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3266  * @mdev:       DRBD device.
3267  * @io_fn:      IO callback to be called when bitmap IO is possible
3268  * @done:       callback to be called after the bitmap IO was performed
3269  * @why:        Descriptive text of the reason for doing the IO
3270  *
3271  * While IO on the bitmap happens we freeze application IO thus we ensure
3272  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3273  * called from worker context. It MUST NOT be used while a previous such
3274  * work is still pending!
3275  */
3276 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3277                           int (*io_fn)(struct drbd_conf *),
3278                           void (*done)(struct drbd_conf *, int),
3279                           char *why, enum bm_flag flags)
3280 {
3281         D_ASSERT(current == mdev->tconn->worker.task);
3282
3283         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3284         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3285         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3286         if (mdev->bm_io_work.why)
3287                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3288                         why, mdev->bm_io_work.why);
3289
3290         mdev->bm_io_work.io_fn = io_fn;
3291         mdev->bm_io_work.done = done;
3292         mdev->bm_io_work.why = why;
3293         mdev->bm_io_work.flags = flags;
3294
3295         spin_lock_irq(&mdev->tconn->req_lock);
3296         set_bit(BITMAP_IO, &mdev->flags);
3297         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3298                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3299                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3300         }
3301         spin_unlock_irq(&mdev->tconn->req_lock);
3302 }
3303
3304 /**
3305  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3306  * @mdev:       DRBD device.
3307  * @io_fn:      IO callback to be called when bitmap IO is possible
3308  * @why:        Descriptive text of the reason for doing the IO
3309  *
3310  * freezes application IO while that the actual IO operations runs. This
3311  * functions MAY NOT be called from worker context.
3312  */
3313 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3314                 char *why, enum bm_flag flags)
3315 {
3316         int rv;
3317
3318         D_ASSERT(current != mdev->tconn->worker.task);
3319
3320         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3321                 drbd_suspend_io(mdev);
3322
3323         drbd_bm_lock(mdev, why, flags);
3324         rv = io_fn(mdev);
3325         drbd_bm_unlock(mdev);
3326
3327         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3328                 drbd_resume_io(mdev);
3329
3330         return rv;
3331 }
3332
3333 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3334 {
3335         if ((mdev->ldev->md.flags & flag) != flag) {
3336                 drbd_md_mark_dirty(mdev);
3337                 mdev->ldev->md.flags |= flag;
3338         }
3339 }
3340
3341 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3342 {
3343         if ((mdev->ldev->md.flags & flag) != 0) {
3344                 drbd_md_mark_dirty(mdev);
3345                 mdev->ldev->md.flags &= ~flag;
3346         }
3347 }
3348 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3349 {
3350         return (bdev->md.flags & flag) != 0;
3351 }
3352
3353 static void md_sync_timer_fn(unsigned long data)
3354 {
3355         struct drbd_conf *mdev = (struct drbd_conf *) data;
3356
3357         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3358 }
3359
3360 static int w_md_sync(struct drbd_work *w, int unused)
3361 {
3362         struct drbd_conf *mdev = w->mdev;
3363
3364         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3365 #ifdef DEBUG
3366         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3367                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3368 #endif
3369         drbd_md_sync(mdev);
3370         return 0;
3371 }
3372
3373 const char *cmdname(enum drbd_packet cmd)
3374 {
3375         /* THINK may need to become several global tables
3376          * when we want to support more than
3377          * one PRO_VERSION */
3378         static const char *cmdnames[] = {
3379                 [P_DATA]                = "Data",
3380                 [P_DATA_REPLY]          = "DataReply",
3381                 [P_RS_DATA_REPLY]       = "RSDataReply",
3382                 [P_BARRIER]             = "Barrier",
3383                 [P_BITMAP]              = "ReportBitMap",
3384                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3385                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3386                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3387                 [P_DATA_REQUEST]        = "DataRequest",
3388                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3389                 [P_SYNC_PARAM]          = "SyncParam",
3390                 [P_SYNC_PARAM89]        = "SyncParam89",
3391                 [P_PROTOCOL]            = "ReportProtocol",
3392                 [P_UUIDS]               = "ReportUUIDs",
3393                 [P_SIZES]               = "ReportSizes",
3394                 [P_STATE]               = "ReportState",
3395                 [P_SYNC_UUID]           = "ReportSyncUUID",
3396                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3397                 [P_AUTH_RESPONSE]       = "AuthResponse",
3398                 [P_PING]                = "Ping",
3399                 [P_PING_ACK]            = "PingAck",
3400                 [P_RECV_ACK]            = "RecvAck",
3401                 [P_WRITE_ACK]           = "WriteAck",
3402                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3403                 [P_DISCARD_WRITE]        = "DiscardWrite",
3404                 [P_NEG_ACK]             = "NegAck",
3405                 [P_NEG_DREPLY]          = "NegDReply",
3406                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3407                 [P_BARRIER_ACK]         = "BarrierAck",
3408                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3409                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3410                 [P_OV_REQUEST]          = "OVRequest",
3411                 [P_OV_REPLY]            = "OVReply",
3412                 [P_OV_RESULT]           = "OVResult",
3413                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3414                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3415                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3416                 [P_DELAY_PROBE]         = "DelayProbe",
3417                 [P_OUT_OF_SYNC]         = "OutOfSync",
3418                 [P_RETRY_WRITE]         = "RetryWrite",
3419                 [P_RS_CANCEL]           = "RSCancel",
3420                 [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3421                 [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3422                 [P_RETRY_WRITE]         = "retry_write",
3423                 [P_PROTOCOL_UPDATE]     = "protocol_update",
3424
3425                 /* enum drbd_packet, but not commands - obsoleted flags:
3426                  *      P_MAY_IGNORE
3427                  *      P_MAX_OPT_CMD
3428                  */
3429         };
3430
3431         /* too big for the array: 0xfffX */
3432         if (cmd == P_INITIAL_META)
3433                 return "InitialMeta";
3434         if (cmd == P_INITIAL_DATA)
3435                 return "InitialData";
3436         if (cmd == P_CONNECTION_FEATURES)
3437                 return "ConnectionFeatures";
3438         if (cmd >= ARRAY_SIZE(cmdnames))
3439                 return "Unknown";
3440         return cmdnames[cmd];
3441 }
3442
3443 /**
3444  * drbd_wait_misc  -  wait for a request to make progress
3445  * @mdev:       device associated with the request
3446  * @i:          the struct drbd_interval embedded in struct drbd_request or
3447  *              struct drbd_peer_request
3448  */
3449 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3450 {
3451         struct net_conf *nc;
3452         DEFINE_WAIT(wait);
3453         long timeout;
3454
3455         rcu_read_lock();
3456         nc = rcu_dereference(mdev->tconn->net_conf);
3457         if (!nc) {
3458                 rcu_read_unlock();
3459                 return -ETIMEDOUT;
3460         }
3461         timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3462         rcu_read_unlock();
3463
3464         /* Indicate to wake up mdev->misc_wait on progress.  */
3465         i->waiting = true;
3466         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3467         spin_unlock_irq(&mdev->tconn->req_lock);
3468         timeout = schedule_timeout(timeout);
3469         finish_wait(&mdev->misc_wait, &wait);
3470         spin_lock_irq(&mdev->tconn->req_lock);
3471         if (!timeout || mdev->state.conn < C_CONNECTED)
3472                 return -ETIMEDOUT;
3473         if (signal_pending(current))
3474                 return -ERESTARTSYS;
3475         return 0;
3476 }
3477
3478 #ifdef CONFIG_DRBD_FAULT_INJECTION
3479 /* Fault insertion support including random number generator shamelessly
3480  * stolen from kernel/rcutorture.c */
3481 struct fault_random_state {
3482         unsigned long state;
3483         unsigned long count;
3484 };
3485
3486 #define FAULT_RANDOM_MULT 39916801  /* prime */
3487 #define FAULT_RANDOM_ADD        479001701 /* prime */
3488 #define FAULT_RANDOM_REFRESH 10000
3489
3490 /*
3491  * Crude but fast random-number generator.  Uses a linear congruential
3492  * generator, with occasional help from get_random_bytes().
3493  */
3494 static unsigned long
3495 _drbd_fault_random(struct fault_random_state *rsp)
3496 {
3497         long refresh;
3498
3499         if (!rsp->count--) {
3500                 get_random_bytes(&refresh, sizeof(refresh));
3501                 rsp->state += refresh;
3502                 rsp->count = FAULT_RANDOM_REFRESH;
3503         }
3504         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3505         return swahw32(rsp->state);
3506 }
3507
3508 static char *
3509 _drbd_fault_str(unsigned int type) {
3510         static char *_faults[] = {
3511                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3512                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3513                 [DRBD_FAULT_RS_WR] = "Resync write",
3514                 [DRBD_FAULT_RS_RD] = "Resync read",
3515                 [DRBD_FAULT_DT_WR] = "Data write",
3516                 [DRBD_FAULT_DT_RD] = "Data read",
3517                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3518                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3519                 [DRBD_FAULT_AL_EE] = "EE allocation",
3520                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3521         };
3522
3523         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3524 }
3525
3526 unsigned int
3527 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3528 {
3529         static struct fault_random_state rrs = {0, 0};
3530
3531         unsigned int ret = (
3532                 (fault_devs == 0 ||
3533                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3534                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3535
3536         if (ret) {
3537                 fault_count++;
3538
3539                 if (__ratelimit(&drbd_ratelimit_state))
3540                         dev_warn(DEV, "***Simulating %s failure\n",
3541                                 _drbd_fault_str(type));
3542         }
3543
3544         return ret;
3545 }
3546 #endif
3547
3548 const char *drbd_buildtag(void)
3549 {
3550         /* DRBD built from external sources has here a reference to the
3551            git hash of the source code. */
3552
3553         static char buildtag[38] = "\0uilt-in";
3554
3555         if (buildtag[0] == 0) {
3556 #ifdef CONFIG_MODULES
3557                 if (THIS_MODULE != NULL)
3558                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3559                 else
3560 #endif
3561                         buildtag[0] = 'b';
3562         }
3563
3564         return buildtag;
3565 }
3566
3567 module_init(drbd_init)
3568 module_exit(drbd_cleanup)
3569
3570 EXPORT_SYMBOL(drbd_conn_str);
3571 EXPORT_SYMBOL(drbd_role_str);
3572 EXPORT_SYMBOL(drbd_disk_str);
3573 EXPORT_SYMBOL(drbd_set_st_err_str);