drbd: Introduce drbd_header_size()
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123 DEFINE_MUTEX(drbd_cfg_mutex);
124
125 struct kmem_cache *drbd_request_cache;
126 struct kmem_cache *drbd_ee_cache;       /* peer requests */
127 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
128 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
129 mempool_t *drbd_request_mempool;
130 mempool_t *drbd_ee_mempool;
131 mempool_t *drbd_md_io_page_pool;
132 struct bio_set *drbd_md_io_bio_set;
133
134 /* I do not use a standard mempool, because:
135    1) I want to hand out the pre-allocated objects first.
136    2) I want to be able to interrupt sleeping allocation with a signal.
137    Note: This is a single linked list, the next pointer is the private
138          member of struct page.
139  */
140 struct page *drbd_pp_pool;
141 spinlock_t   drbd_pp_lock;
142 int          drbd_pp_vacant;
143 wait_queue_head_t drbd_pp_wait;
144
145 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
146
147 static const struct block_device_operations drbd_ops = {
148         .owner =   THIS_MODULE,
149         .open =    drbd_open,
150         .release = drbd_release,
151 };
152
153 static void bio_destructor_drbd(struct bio *bio)
154 {
155         bio_free(bio, drbd_md_io_bio_set);
156 }
157
158 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
159 {
160         struct bio *bio;
161
162         if (!drbd_md_io_bio_set)
163                 return bio_alloc(gfp_mask, 1);
164
165         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
166         if (!bio)
167                 return NULL;
168         bio->bi_destructor = bio_destructor_drbd;
169         return bio;
170 }
171
172 #ifdef __CHECKER__
173 /* When checking with sparse, and this is an inline function, sparse will
174    give tons of false positives. When this is a real functions sparse works.
175  */
176 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
177 {
178         int io_allowed;
179
180         atomic_inc(&mdev->local_cnt);
181         io_allowed = (mdev->state.disk >= mins);
182         if (!io_allowed) {
183                 if (atomic_dec_and_test(&mdev->local_cnt))
184                         wake_up(&mdev->misc_wait);
185         }
186         return io_allowed;
187 }
188
189 #endif
190
191 /**
192  * DOC: The transfer log
193  *
194  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
195  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
196  * of the list. There is always at least one &struct drbd_tl_epoch object.
197  *
198  * Each &struct drbd_tl_epoch has a circular double linked list of requests
199  * attached.
200  */
201 static int tl_init(struct drbd_tconn *tconn)
202 {
203         struct drbd_tl_epoch *b;
204
205         /* during device minor initialization, we may well use GFP_KERNEL */
206         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
207         if (!b)
208                 return 0;
209         INIT_LIST_HEAD(&b->requests);
210         INIT_LIST_HEAD(&b->w.list);
211         b->next = NULL;
212         b->br_number = 4711;
213         b->n_writes = 0;
214         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
215
216         tconn->oldest_tle = b;
217         tconn->newest_tle = b;
218         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
219
220         return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225         if (tconn->oldest_tle != tconn->newest_tle)
226                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227         if (!list_empty(&tconn->out_of_sequence_requests))
228                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229         kfree(tconn->oldest_tle);
230         tconn->oldest_tle = NULL;
231         kfree(tconn->unused_spare_tle);
232         tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236  * _tl_add_barrier() - Adds a barrier to the transfer log
237  * @mdev:       DRBD device.
238  * @new:        Barrier to be added before the current head of the TL.
239  *
240  * The caller must hold the req_lock.
241  */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244         struct drbd_tl_epoch *newest_before;
245
246         INIT_LIST_HEAD(&new->requests);
247         INIT_LIST_HEAD(&new->w.list);
248         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249         new->next = NULL;
250         new->n_writes = 0;
251
252         newest_before = tconn->newest_tle;
253         /* never send a barrier number == 0, because that is special-cased
254          * when using TCQ for our write ordering code */
255         new->br_number = (newest_before->br_number+1) ?: 1;
256         if (tconn->newest_tle != new) {
257                 tconn->newest_tle->next = new;
258                 tconn->newest_tle = new;
259         }
260 }
261
262 /**
263  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264  * @mdev:       DRBD device.
265  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266  * @set_size:   Expected number of requests before that barrier.
267  *
268  * In case the passed barrier_nr or set_size does not match the oldest
269  * &struct drbd_tl_epoch objects this function will cause a termination
270  * of the connection.
271  */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273                 unsigned int set_size)
274 {
275         struct drbd_conf *mdev;
276         struct drbd_tl_epoch *b, *nob; /* next old barrier */
277         struct list_head *le, *tle;
278         struct drbd_request *r;
279
280         spin_lock_irq(&tconn->req_lock);
281
282         b = tconn->oldest_tle;
283
284         /* first some paranoia code */
285         if (b == NULL) {
286                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287                          barrier_nr);
288                 goto bail;
289         }
290         if (b->br_number != barrier_nr) {
291                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292                          barrier_nr, b->br_number);
293                 goto bail;
294         }
295         if (b->n_writes != set_size) {
296                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297                          barrier_nr, set_size, b->n_writes);
298                 goto bail;
299         }
300
301         /* Clean up list of requests processed during current epoch */
302         list_for_each_safe(le, tle, &b->requests) {
303                 r = list_entry(le, struct drbd_request, tl_requests);
304                 _req_mod(r, BARRIER_ACKED);
305         }
306         /* There could be requests on the list waiting for completion
307            of the write to the local disk. To avoid corruptions of
308            slab's data structures we have to remove the lists head.
309
310            Also there could have been a barrier ack out of sequence, overtaking
311            the write acks - which would be a bug and violating write ordering.
312            To not deadlock in case we lose connection while such requests are
313            still pending, we need some way to find them for the
314            _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316            These have been list_move'd to the out_of_sequence_requests list in
317            _req_mod(, BARRIER_ACKED) above.
318            */
319         list_del_init(&b->requests);
320         mdev = b->w.mdev;
321
322         nob = b->next;
323         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324                 _tl_add_barrier(tconn, b);
325                 if (nob)
326                         tconn->oldest_tle = nob;
327                 /* if nob == NULL b was the only barrier, and becomes the new
328                    barrier. Therefore tconn->oldest_tle points already to b */
329         } else {
330                 D_ASSERT(nob != NULL);
331                 tconn->oldest_tle = nob;
332                 kfree(b);
333         }
334
335         spin_unlock_irq(&tconn->req_lock);
336         dec_ap_pending(mdev);
337
338         return;
339
340 bail:
341         spin_unlock_irq(&tconn->req_lock);
342         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347  * _tl_restart() - Walks the transfer log, and applies an action to all requests
348  * @mdev:       DRBD device.
349  * @what:       The action/event to perform with all request objects
350  *
351  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352  * RESTART_FROZEN_DISK_IO.
353  */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356         struct drbd_tl_epoch *b, *tmp, **pn;
357         struct list_head *le, *tle, carry_reads;
358         struct drbd_request *req;
359         int rv, n_writes, n_reads;
360
361         b = tconn->oldest_tle;
362         pn = &tconn->oldest_tle;
363         while (b) {
364                 n_writes = 0;
365                 n_reads = 0;
366                 INIT_LIST_HEAD(&carry_reads);
367                 list_for_each_safe(le, tle, &b->requests) {
368                         req = list_entry(le, struct drbd_request, tl_requests);
369                         rv = _req_mod(req, what);
370
371                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
373                 }
374                 tmp = b->next;
375
376                 if (n_writes) {
377                         if (what == RESEND) {
378                                 b->n_writes = n_writes;
379                                 if (b->w.cb == NULL) {
380                                         b->w.cb = w_send_barrier;
381                                         inc_ap_pending(b->w.mdev);
382                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
383                                 }
384
385                                 drbd_queue_work(&tconn->data.work, &b->w);
386                         }
387                         pn = &b->next;
388                 } else {
389                         if (n_reads)
390                                 list_add(&carry_reads, &b->requests);
391                         /* there could still be requests on that ring list,
392                          * in case local io is still pending */
393                         list_del(&b->requests);
394
395                         /* dec_ap_pending corresponding to queue_barrier.
396                          * the newest barrier may not have been queued yet,
397                          * in which case w.cb is still NULL. */
398                         if (b->w.cb != NULL)
399                                 dec_ap_pending(b->w.mdev);
400
401                         if (b == tconn->newest_tle) {
402                                 /* recycle, but reinit! */
403                                 if (tmp != NULL)
404                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
405                                 INIT_LIST_HEAD(&b->requests);
406                                 list_splice(&carry_reads, &b->requests);
407                                 INIT_LIST_HEAD(&b->w.list);
408                                 b->w.cb = NULL;
409                                 b->br_number = net_random();
410                                 b->n_writes = 0;
411
412                                 *pn = b;
413                                 break;
414                         }
415                         *pn = tmp;
416                         kfree(b);
417                 }
418                 b = tmp;
419                 list_splice(&carry_reads, &b->requests);
420         }
421 }
422
423
424 /**
425  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
426  * @mdev:       DRBD device.
427  *
428  * This is called after the connection to the peer was lost. The storage covered
429  * by the requests on the transfer gets marked as our of sync. Called from the
430  * receiver thread and the worker thread.
431  */
432 void tl_clear(struct drbd_tconn *tconn)
433 {
434         struct drbd_conf *mdev;
435         struct list_head *le, *tle;
436         struct drbd_request *r;
437         int vnr;
438
439         spin_lock_irq(&tconn->req_lock);
440
441         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
442
443         /* we expect this list to be empty. */
444         if (!list_empty(&tconn->out_of_sequence_requests))
445                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
446
447         /* but just in case, clean it up anyways! */
448         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
449                 r = list_entry(le, struct drbd_request, tl_requests);
450                 /* It would be nice to complete outside of spinlock.
451                  * But this is easier for now. */
452                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
453         }
454
455         /* ensure bit indicating barrier is required is clear */
456         idr_for_each_entry(&tconn->volumes, mdev, vnr)
457                 clear_bit(CREATE_BARRIER, &mdev->flags);
458
459         spin_unlock_irq(&tconn->req_lock);
460 }
461
462 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
463 {
464         spin_lock_irq(&tconn->req_lock);
465         _tl_restart(tconn, what);
466         spin_unlock_irq(&tconn->req_lock);
467 }
468
469 static int drbd_thread_setup(void *arg)
470 {
471         struct drbd_thread *thi = (struct drbd_thread *) arg;
472         struct drbd_tconn *tconn = thi->tconn;
473         unsigned long flags;
474         int retval;
475
476         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
477                  thi->name[0], thi->tconn->name);
478
479 restart:
480         retval = thi->function(thi);
481
482         spin_lock_irqsave(&thi->t_lock, flags);
483
484         /* if the receiver has been "EXITING", the last thing it did
485          * was set the conn state to "StandAlone",
486          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
487          * and receiver thread will be "started".
488          * drbd_thread_start needs to set "RESTARTING" in that case.
489          * t_state check and assignment needs to be within the same spinlock,
490          * so either thread_start sees EXITING, and can remap to RESTARTING,
491          * or thread_start see NONE, and can proceed as normal.
492          */
493
494         if (thi->t_state == RESTARTING) {
495                 conn_info(tconn, "Restarting %s thread\n", thi->name);
496                 thi->t_state = RUNNING;
497                 spin_unlock_irqrestore(&thi->t_lock, flags);
498                 goto restart;
499         }
500
501         thi->task = NULL;
502         thi->t_state = NONE;
503         smp_mb();
504         complete(&thi->stop);
505         spin_unlock_irqrestore(&thi->t_lock, flags);
506
507         conn_info(tconn, "Terminating %s\n", current->comm);
508
509         /* Release mod reference taken when thread was started */
510         module_put(THIS_MODULE);
511         return retval;
512 }
513
514 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
515                              int (*func) (struct drbd_thread *), char *name)
516 {
517         spin_lock_init(&thi->t_lock);
518         thi->task    = NULL;
519         thi->t_state = NONE;
520         thi->function = func;
521         thi->tconn = tconn;
522         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
523 }
524
525 int drbd_thread_start(struct drbd_thread *thi)
526 {
527         struct drbd_tconn *tconn = thi->tconn;
528         struct task_struct *nt;
529         unsigned long flags;
530
531         /* is used from state engine doing drbd_thread_stop_nowait,
532          * while holding the req lock irqsave */
533         spin_lock_irqsave(&thi->t_lock, flags);
534
535         switch (thi->t_state) {
536         case NONE:
537                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
538                          thi->name, current->comm, current->pid);
539
540                 /* Get ref on module for thread - this is released when thread exits */
541                 if (!try_module_get(THIS_MODULE)) {
542                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
543                         spin_unlock_irqrestore(&thi->t_lock, flags);
544                         return false;
545                 }
546
547                 init_completion(&thi->stop);
548                 thi->reset_cpu_mask = 1;
549                 thi->t_state = RUNNING;
550                 spin_unlock_irqrestore(&thi->t_lock, flags);
551                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
552
553                 nt = kthread_create(drbd_thread_setup, (void *) thi,
554                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
555
556                 if (IS_ERR(nt)) {
557                         conn_err(tconn, "Couldn't start thread\n");
558
559                         module_put(THIS_MODULE);
560                         return false;
561                 }
562                 spin_lock_irqsave(&thi->t_lock, flags);
563                 thi->task = nt;
564                 thi->t_state = RUNNING;
565                 spin_unlock_irqrestore(&thi->t_lock, flags);
566                 wake_up_process(nt);
567                 break;
568         case EXITING:
569                 thi->t_state = RESTARTING;
570                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
571                                 thi->name, current->comm, current->pid);
572                 /* fall through */
573         case RUNNING:
574         case RESTARTING:
575         default:
576                 spin_unlock_irqrestore(&thi->t_lock, flags);
577                 break;
578         }
579
580         return true;
581 }
582
583
584 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
585 {
586         unsigned long flags;
587
588         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
589
590         /* may be called from state engine, holding the req lock irqsave */
591         spin_lock_irqsave(&thi->t_lock, flags);
592
593         if (thi->t_state == NONE) {
594                 spin_unlock_irqrestore(&thi->t_lock, flags);
595                 if (restart)
596                         drbd_thread_start(thi);
597                 return;
598         }
599
600         if (thi->t_state != ns) {
601                 if (thi->task == NULL) {
602                         spin_unlock_irqrestore(&thi->t_lock, flags);
603                         return;
604                 }
605
606                 thi->t_state = ns;
607                 smp_mb();
608                 init_completion(&thi->stop);
609                 if (thi->task != current)
610                         force_sig(DRBD_SIGKILL, thi->task);
611         }
612
613         spin_unlock_irqrestore(&thi->t_lock, flags);
614
615         if (wait)
616                 wait_for_completion(&thi->stop);
617 }
618
619 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
620 {
621         struct drbd_thread *thi =
622                 task == tconn->receiver.task ? &tconn->receiver :
623                 task == tconn->asender.task  ? &tconn->asender :
624                 task == tconn->worker.task   ? &tconn->worker : NULL;
625
626         return thi;
627 }
628
629 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
630 {
631         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
632         return thi ? thi->name : task->comm;
633 }
634
635 int conn_lowest_minor(struct drbd_tconn *tconn)
636 {
637         int vnr = 0;
638         struct drbd_conf *mdev;
639
640         mdev = idr_get_next(&tconn->volumes, &vnr);
641         if (!mdev)
642                 return -1;
643         return mdev_to_minor(mdev);
644 }
645
646 #ifdef CONFIG_SMP
647 /**
648  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
649  * @mdev:       DRBD device.
650  *
651  * Forces all threads of a device onto the same CPU. This is beneficial for
652  * DRBD's performance. May be overwritten by user's configuration.
653  */
654 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
655 {
656         int ord, cpu;
657
658         /* user override. */
659         if (cpumask_weight(tconn->cpu_mask))
660                 return;
661
662         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
663         for_each_online_cpu(cpu) {
664                 if (ord-- == 0) {
665                         cpumask_set_cpu(cpu, tconn->cpu_mask);
666                         return;
667                 }
668         }
669         /* should not be reached */
670         cpumask_setall(tconn->cpu_mask);
671 }
672
673 /**
674  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
675  * @mdev:       DRBD device.
676  * @thi:        drbd_thread object
677  *
678  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
679  * prematurely.
680  */
681 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
682 {
683         struct task_struct *p = current;
684
685         if (!thi->reset_cpu_mask)
686                 return;
687         thi->reset_cpu_mask = 0;
688         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
689 }
690 #endif
691
692 /**
693  * drbd_header_size  -  size of a packet header
694  *
695  * The header size is a multiple of 8, so any payload following the header is
696  * word aligned on 64-bit architectures.  (The bitmap send and receive code
697  * relies on this.)
698  */
699 unsigned int drbd_header_size(struct drbd_tconn *tconn)
700 {
701         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
702         BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
703         return sizeof(struct p_header80);
704 }
705
706 static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
707 {
708         h->magic   = cpu_to_be32(DRBD_MAGIC);
709         h->command = cpu_to_be16(cmd);
710         h->length  = cpu_to_be16(size);
711 }
712
713 static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
714 {
715         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
716         h->command = cpu_to_be16(cmd);
717         h->length  = cpu_to_be32(size);
718 }
719
720 static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
721                             enum drbd_packet cmd, int size)
722 {
723         if (tconn->agreed_pro_version >= 95)
724                 prepare_header95(&h->h95, cmd, size);
725         else
726                 prepare_header80(&h->h80, cmd, size);
727 }
728
729 static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
730                            enum drbd_packet cmd, int size)
731 {
732         _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
733 }
734
735 /* the appropriate socket mutex must be held already */
736 int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct drbd_socket *sock,
737                    enum drbd_packet cmd, struct p_header *h, size_t size,
738                    unsigned msg_flags)
739 {
740         int err;
741
742         _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
743         err = drbd_send_all(tconn, sock->socket, h, size, msg_flags);
744         if (err && !signal_pending(current))
745                 conn_warn(tconn, "short send %s size=%d\n",
746                           cmdname(cmd), (int)size);
747         return err;
748 }
749
750 /* don't pass the socket. we may only look at it
751  * when we hold the appropriate socket mutex.
752  */
753 int conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct drbd_socket *sock,
754                   enum drbd_packet cmd, struct p_header *h, size_t size)
755 {
756         int err = -EIO;
757
758         mutex_lock(&sock->mutex);
759         if (sock->socket)
760                 err = _conn_send_cmd(tconn, vnr, sock, cmd, h, size, 0);
761         mutex_unlock(&sock->mutex);
762         return err;
763 }
764
765 int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
766                    size_t size)
767 {
768         struct p_header80 h;
769         int err;
770
771         prepare_header80(&h, cmd, size);
772         err = drbd_get_data_sock(tconn);
773         if (!err) {
774                 err = drbd_send_all(tconn, tconn->data.socket, &h, sizeof(h), 0);
775                 if (!err)
776                         err = drbd_send_all(tconn, tconn->data.socket, data, size, 0);
777                 drbd_put_data_sock(tconn);
778         }
779         return err;
780 }
781
782 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
783 {
784         mutex_lock(&sock->mutex);
785         if (!sock->socket) {
786                 mutex_unlock(&sock->mutex);
787                 return NULL;
788         }
789         return sock->sbuf;
790 }
791
792 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
793 {
794         return conn_prepare_command(mdev->tconn, sock);
795 }
796
797 static int __send_command(struct drbd_tconn *tconn, int vnr,
798                           struct drbd_socket *sock, enum drbd_packet cmd,
799                           unsigned int header_size, void *data,
800                           unsigned int size)
801 {
802         int msg_flags;
803         int err;
804
805         /*
806          * Called with @data == NULL and the size of the data blocks in @size
807          * for commands that send data blocks.  For those commands, omit the
808          * MSG_MORE flag: this will increase the likelihood that data blocks
809          * which are page aligned on the sender will end up page aligned on the
810          * receiver.
811          */
812         msg_flags = data ? MSG_MORE : 0;
813
814         _prepare_header(tconn, vnr, sock->sbuf, cmd,
815                         header_size - sizeof(struct p_header) + size);
816         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
817                             msg_flags);
818         if (data && !err)
819                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
820         return err;
821 }
822
823 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
824                       enum drbd_packet cmd, unsigned int header_size,
825                       void *data, unsigned int size)
826 {
827         int err;
828
829         err = __send_command(tconn, 0, sock, cmd, header_size, data, size);
830         mutex_unlock(&sock->mutex);
831         return err;
832 }
833
834 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
835                       enum drbd_packet cmd, unsigned int header_size,
836                       void *data, unsigned int size)
837 {
838         int err;
839
840         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
841                              data, size);
842         mutex_unlock(&sock->mutex);
843         return err;
844 }
845
846 int drbd_send_ping(struct drbd_tconn *tconn)
847 {
848         struct p_header h;
849         return conn_send_cmd(tconn, 0, &tconn->meta, P_PING, &h, sizeof(h));
850 }
851
852 int drbd_send_ping_ack(struct drbd_tconn *tconn)
853 {
854         struct p_header h;
855         return conn_send_cmd(tconn, 0, &tconn->meta, P_PING_ACK, &h, sizeof(h));
856 }
857
858 int drbd_send_sync_param(struct drbd_conf *mdev)
859 {
860         struct p_rs_param_95 *p;
861         struct drbd_socket *sock;
862         int size, err;
863         const int apv = mdev->tconn->agreed_pro_version;
864
865         size = apv <= 87 ? sizeof(struct p_rs_param)
866                 : apv == 88 ? sizeof(struct p_rs_param)
867                         + strlen(mdev->tconn->net_conf->verify_alg) + 1
868                 : apv <= 94 ? sizeof(struct p_rs_param_89)
869                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
870
871         mutex_lock(&mdev->tconn->data.mutex);
872         sock = &mdev->tconn->data;
873
874         if (likely(sock->socket != NULL)) {
875                 enum drbd_packet cmd =
876                         apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
877
878                 p = mdev->tconn->data.sbuf;
879
880                 /* initialize verify_alg and csums_alg */
881                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
882
883                 if (get_ldev(mdev)) {
884                         p->rate = cpu_to_be32(mdev->ldev->dc.resync_rate);
885                         p->c_plan_ahead = cpu_to_be32(mdev->ldev->dc.c_plan_ahead);
886                         p->c_delay_target = cpu_to_be32(mdev->ldev->dc.c_delay_target);
887                         p->c_fill_target = cpu_to_be32(mdev->ldev->dc.c_fill_target);
888                         p->c_max_rate = cpu_to_be32(mdev->ldev->dc.c_max_rate);
889                         put_ldev(mdev);
890                 } else {
891                         p->rate = cpu_to_be32(DRBD_RATE_DEF);
892                         p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
893                         p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
894                         p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
895                         p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
896                 }
897
898                 if (apv >= 88)
899                         strcpy(p->verify_alg, mdev->tconn->net_conf->verify_alg);
900                 if (apv >= 89)
901                         strcpy(p->csums_alg, mdev->tconn->net_conf->csums_alg);
902
903                 err = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
904         } else
905                 err = -EIO;
906
907         mutex_unlock(&mdev->tconn->data.mutex);
908
909         return err;
910 }
911
912 int drbd_send_protocol(struct drbd_tconn *tconn)
913 {
914         struct p_protocol *p;
915         int size, cf, err;
916
917         size = sizeof(struct p_protocol);
918
919         if (tconn->agreed_pro_version >= 87)
920                 size += strlen(tconn->net_conf->integrity_alg) + 1;
921
922         /* we must not recurse into our own queue,
923          * as that is blocked during handshake */
924         p = kmalloc(size, GFP_NOIO);
925         if (p == NULL)
926                 return -ENOMEM;
927
928         p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
929         p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
930         p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
931         p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
932         p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
933
934         cf = 0;
935         if (tconn->net_conf->want_lose)
936                 cf |= CF_WANT_LOSE;
937         if (tconn->net_conf->dry_run) {
938                 if (tconn->agreed_pro_version >= 92)
939                         cf |= CF_DRY_RUN;
940                 else {
941                         conn_err(tconn, "--dry-run is not supported by peer");
942                         kfree(p);
943                         return -EOPNOTSUPP;
944                 }
945         }
946         p->conn_flags    = cpu_to_be32(cf);
947
948         if (tconn->agreed_pro_version >= 87)
949                 strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
950
951         err = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
952         kfree(p);
953         return err;
954 }
955
956 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
957 {
958         struct p_uuids p;
959         int i;
960
961         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
962                 return 0;
963
964         for (i = UI_CURRENT; i < UI_SIZE; i++)
965                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
966
967         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
968         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
969         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
970         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
971         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
972         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
973
974         put_ldev(mdev);
975
976         return drbd_send_cmd(mdev, &mdev->tconn->data, P_UUIDS, &p.head, sizeof(p));
977 }
978
979 int drbd_send_uuids(struct drbd_conf *mdev)
980 {
981         return _drbd_send_uuids(mdev, 0);
982 }
983
984 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
985 {
986         return _drbd_send_uuids(mdev, 8);
987 }
988
989 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
990 {
991         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
992                 u64 *uuid = mdev->ldev->md.uuid;
993                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
994                      text,
995                      (unsigned long long)uuid[UI_CURRENT],
996                      (unsigned long long)uuid[UI_BITMAP],
997                      (unsigned long long)uuid[UI_HISTORY_START],
998                      (unsigned long long)uuid[UI_HISTORY_END]);
999                 put_ldev(mdev);
1000         } else {
1001                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1002                                 text,
1003                                 (unsigned long long)mdev->ed_uuid);
1004         }
1005 }
1006
1007 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1008 {
1009         struct p_rs_uuid p;
1010         u64 uuid;
1011
1012         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1013
1014         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1015         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1016         drbd_print_uuids(mdev, "updated sync UUID");
1017         drbd_md_sync(mdev);
1018         p.uuid = cpu_to_be64(uuid);
1019
1020         drbd_send_cmd(mdev, &mdev->tconn->data, P_SYNC_UUID, &p.head, sizeof(p));
1021 }
1022
1023 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1024 {
1025         struct p_sizes p;
1026         sector_t d_size, u_size;
1027         int q_order_type, max_bio_size;
1028
1029         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1030                 D_ASSERT(mdev->ldev->backing_bdev);
1031                 d_size = drbd_get_max_capacity(mdev->ldev);
1032                 u_size = mdev->ldev->dc.disk_size;
1033                 q_order_type = drbd_queue_order_type(mdev);
1034                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1035                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1036                 put_ldev(mdev);
1037         } else {
1038                 d_size = 0;
1039                 u_size = 0;
1040                 q_order_type = QUEUE_ORDERED_NONE;
1041                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1042         }
1043
1044         p.d_size = cpu_to_be64(d_size);
1045         p.u_size = cpu_to_be64(u_size);
1046         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1047         p.max_bio_size = cpu_to_be32(max_bio_size);
1048         p.queue_order_type = cpu_to_be16(q_order_type);
1049         p.dds_flags = cpu_to_be16(flags);
1050
1051         return drbd_send_cmd(mdev, &mdev->tconn->data, P_SIZES, &p.head, sizeof(p));
1052 }
1053
1054 /**
1055  * drbd_send_state() - Sends the drbd state to the peer
1056  * @mdev:       DRBD device.
1057  */
1058 int drbd_send_state(struct drbd_conf *mdev)
1059 {
1060         struct drbd_socket *sock;
1061         struct p_state p;
1062         int err = -EIO;
1063
1064         mutex_lock(&mdev->tconn->data.mutex);
1065
1066         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1067         sock = &mdev->tconn->data;
1068
1069         if (likely(sock->socket != NULL))
1070                 err = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
1071
1072         mutex_unlock(&mdev->tconn->data.mutex);
1073
1074         return err;
1075 }
1076
1077 int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
1078                          union drbd_state mask, union drbd_state val)
1079 {
1080         struct p_req_state p;
1081
1082         p.mask    = cpu_to_be32(mask.i);
1083         p.val     = cpu_to_be32(val.i);
1084
1085         return conn_send_cmd(tconn, vnr, &tconn->data, cmd, &p.head, sizeof(p));
1086 }
1087
1088 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1089 {
1090         struct p_req_state_reply p;
1091
1092         p.retcode    = cpu_to_be32(retcode);
1093
1094         drbd_send_cmd(mdev, &mdev->tconn->meta, P_STATE_CHG_REPLY, &p.head, sizeof(p));
1095 }
1096
1097 int conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1098 {
1099         struct p_req_state_reply p;
1100         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1101
1102         p.retcode    = cpu_to_be32(retcode);
1103
1104         return !conn_send_cmd(tconn, 0, &tconn->meta, cmd, &p.head, sizeof(p));
1105 }
1106
1107 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1108 {
1109         BUG_ON(code & ~0xf);
1110         p->encoding = (p->encoding & ~0xf) | code;
1111 }
1112
1113 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1114 {
1115         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1116 }
1117
1118 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1119 {
1120         BUG_ON(n & ~0x7);
1121         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1122 }
1123
1124 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1125         struct p_compressed_bm *p,
1126         struct bm_xfer_ctx *c)
1127 {
1128         struct bitstream bs;
1129         unsigned long plain_bits;
1130         unsigned long tmp;
1131         unsigned long rl;
1132         unsigned len;
1133         unsigned toggle;
1134         int bits;
1135
1136         /* may we use this feature? */
1137         if ((mdev->tconn->net_conf->use_rle == 0) ||
1138                 (mdev->tconn->agreed_pro_version < 90))
1139                         return 0;
1140
1141         if (c->bit_offset >= c->bm_bits)
1142                 return 0; /* nothing to do. */
1143
1144         /* use at most thus many bytes */
1145         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1146         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1147         /* plain bits covered in this code string */
1148         plain_bits = 0;
1149
1150         /* p->encoding & 0x80 stores whether the first run length is set.
1151          * bit offset is implicit.
1152          * start with toggle == 2 to be able to tell the first iteration */
1153         toggle = 2;
1154
1155         /* see how much plain bits we can stuff into one packet
1156          * using RLE and VLI. */
1157         do {
1158                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1159                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1160                 if (tmp == -1UL)
1161                         tmp = c->bm_bits;
1162                 rl = tmp - c->bit_offset;
1163
1164                 if (toggle == 2) { /* first iteration */
1165                         if (rl == 0) {
1166                                 /* the first checked bit was set,
1167                                  * store start value, */
1168                                 dcbp_set_start(p, 1);
1169                                 /* but skip encoding of zero run length */
1170                                 toggle = !toggle;
1171                                 continue;
1172                         }
1173                         dcbp_set_start(p, 0);
1174                 }
1175
1176                 /* paranoia: catch zero runlength.
1177                  * can only happen if bitmap is modified while we scan it. */
1178                 if (rl == 0) {
1179                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1180                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1181                         return -1;
1182                 }
1183
1184                 bits = vli_encode_bits(&bs, rl);
1185                 if (bits == -ENOBUFS) /* buffer full */
1186                         break;
1187                 if (bits <= 0) {
1188                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1189                         return 0;
1190                 }
1191
1192                 toggle = !toggle;
1193                 plain_bits += rl;
1194                 c->bit_offset = tmp;
1195         } while (c->bit_offset < c->bm_bits);
1196
1197         len = bs.cur.b - p->code + !!bs.cur.bit;
1198
1199         if (plain_bits < (len << 3)) {
1200                 /* incompressible with this method.
1201                  * we need to rewind both word and bit position. */
1202                 c->bit_offset -= plain_bits;
1203                 bm_xfer_ctx_bit_to_word_offset(c);
1204                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1205                 return 0;
1206         }
1207
1208         /* RLE + VLI was able to compress it just fine.
1209          * update c->word_offset. */
1210         bm_xfer_ctx_bit_to_word_offset(c);
1211
1212         /* store pad_bits */
1213         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1214
1215         return len;
1216 }
1217
1218 /**
1219  * send_bitmap_rle_or_plain
1220  *
1221  * Return 0 when done, 1 when another iteration is needed, and a negative error
1222  * code upon failure.
1223  */
1224 static int
1225 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1226 {
1227         struct p_compressed_bm *p = mdev->tconn->data.sbuf;
1228         unsigned long num_words;
1229         int len, err;
1230
1231         len = fill_bitmap_rle_bits(mdev, p, c);
1232
1233         if (len < 0)
1234                 return -EIO;
1235
1236         if (len) {
1237                 dcbp_set_code(p, RLE_VLI_Bits);
1238                 err = _drbd_send_cmd(mdev, &mdev->tconn->data,
1239                                      P_COMPRESSED_BITMAP, &p->head,
1240                                      sizeof(*p) + len, 0);
1241
1242                 c->packets[0]++;
1243                 c->bytes[0] += sizeof(*p) + len;
1244
1245                 if (c->bit_offset >= c->bm_bits)
1246                         len = 0; /* DONE */
1247         } else {
1248                 /* was not compressible.
1249                  * send a buffer full of plain text bits instead. */
1250                 struct p_header *h = mdev->tconn->data.sbuf;
1251                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1252                 len = num_words * sizeof(long);
1253                 if (len)
1254                         drbd_bm_get_lel(mdev, c->word_offset, num_words,
1255                                         (unsigned long *)h->payload);
1256                 err = _drbd_send_cmd(mdev, &mdev->tconn->data, P_BITMAP,
1257                                      h, sizeof(struct p_header80) + len, 0);
1258                 c->word_offset += num_words;
1259                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1260
1261                 c->packets[1]++;
1262                 c->bytes[1] += sizeof(struct p_header80) + len;
1263
1264                 if (c->bit_offset > c->bm_bits)
1265                         c->bit_offset = c->bm_bits;
1266         }
1267         if (!err) {
1268                 if (len == 0) {
1269                         INFO_bm_xfer_stats(mdev, "send", c);
1270                         return 0;
1271                 } else
1272                         return 1;
1273         }
1274         return -EIO;
1275 }
1276
1277 /* See the comment at receive_bitmap() */
1278 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1279 {
1280         struct bm_xfer_ctx c;
1281         int err;
1282
1283         if (!expect(mdev->bitmap))
1284                 return false;
1285
1286         if (get_ldev(mdev)) {
1287                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1288                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1289                         drbd_bm_set_all(mdev);
1290                         if (drbd_bm_write(mdev)) {
1291                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1292                                  * but otherwise process as per normal - need to tell other
1293                                  * side that a full resync is required! */
1294                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1295                         } else {
1296                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1297                                 drbd_md_sync(mdev);
1298                         }
1299                 }
1300                 put_ldev(mdev);
1301         }
1302
1303         c = (struct bm_xfer_ctx) {
1304                 .bm_bits = drbd_bm_bits(mdev),
1305                 .bm_words = drbd_bm_words(mdev),
1306         };
1307
1308         do {
1309                 err = send_bitmap_rle_or_plain(mdev, &c);
1310         } while (err > 0);
1311
1312         return err == 0;
1313 }
1314
1315 int drbd_send_bitmap(struct drbd_conf *mdev)
1316 {
1317         int err;
1318
1319         if (drbd_get_data_sock(mdev->tconn))
1320                 return -1;
1321         err = !_drbd_send_bitmap(mdev);
1322         drbd_put_data_sock(mdev->tconn);
1323         return err;
1324 }
1325 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1326 {
1327         struct p_barrier_ack p;
1328
1329         p.barrier  = barrier_nr;
1330         p.set_size = cpu_to_be32(set_size);
1331
1332         if (mdev->state.conn >= C_CONNECTED)
1333                 drbd_send_cmd(mdev, &mdev->tconn->meta, P_BARRIER_ACK, &p.head, sizeof(p));
1334 }
1335
1336 /**
1337  * _drbd_send_ack() - Sends an ack packet
1338  * @mdev:       DRBD device.
1339  * @cmd:        Packet command code.
1340  * @sector:     sector, needs to be in big endian byte order
1341  * @blksize:    size in byte, needs to be in big endian byte order
1342  * @block_id:   Id, big endian byte order
1343  */
1344 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1345                           u64 sector, u32 blksize, u64 block_id)
1346 {
1347         struct p_block_ack p;
1348
1349         p.sector   = sector;
1350         p.block_id = block_id;
1351         p.blksize  = blksize;
1352         p.seq_num  = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1353
1354         if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
1355                 return -EIO;
1356         return drbd_send_cmd(mdev, &mdev->tconn->meta, cmd, &p.head, sizeof(p));
1357 }
1358
1359 /* dp->sector and dp->block_id already/still in network byte order,
1360  * data_size is payload size according to dp->head,
1361  * and may need to be corrected for digest size. */
1362 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1363                       struct p_data *dp, int data_size)
1364 {
1365         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1366                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1367         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1368                        dp->block_id);
1369 }
1370
1371 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1372                       struct p_block_req *rp)
1373 {
1374         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1375 }
1376
1377 /**
1378  * drbd_send_ack() - Sends an ack packet
1379  * @mdev:       DRBD device
1380  * @cmd:        packet command code
1381  * @peer_req:   peer request
1382  */
1383 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1384                   struct drbd_peer_request *peer_req)
1385 {
1386         return _drbd_send_ack(mdev, cmd,
1387                               cpu_to_be64(peer_req->i.sector),
1388                               cpu_to_be32(peer_req->i.size),
1389                               peer_req->block_id);
1390 }
1391
1392 /* This function misuses the block_id field to signal if the blocks
1393  * are is sync or not. */
1394 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1395                      sector_t sector, int blksize, u64 block_id)
1396 {
1397         return _drbd_send_ack(mdev, cmd,
1398                               cpu_to_be64(sector),
1399                               cpu_to_be32(blksize),
1400                               cpu_to_be64(block_id));
1401 }
1402
1403 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1404                        sector_t sector, int size, u64 block_id)
1405 {
1406         struct p_block_req p;
1407
1408         p.sector   = cpu_to_be64(sector);
1409         p.block_id = block_id;
1410         p.blksize  = cpu_to_be32(size);
1411
1412         return drbd_send_cmd(mdev, &mdev->tconn->data, cmd, &p.head, sizeof(p));
1413 }
1414
1415 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1416                             void *digest, int digest_size, enum drbd_packet cmd)
1417 {
1418         int err;
1419         struct p_block_req p;
1420
1421         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
1422         p.sector   = cpu_to_be64(sector);
1423         p.block_id = ID_SYNCER /* unused */;
1424         p.blksize  = cpu_to_be32(size);
1425
1426         mutex_lock(&mdev->tconn->data.mutex);
1427         err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0);
1428         if (!err)
1429                 err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0);
1430         mutex_unlock(&mdev->tconn->data.mutex);
1431         return err;
1432 }
1433
1434 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1435 {
1436         struct p_block_req p;
1437
1438         p.sector   = cpu_to_be64(sector);
1439         p.block_id = ID_SYNCER /* unused */;
1440         p.blksize  = cpu_to_be32(size);
1441
1442         return drbd_send_cmd(mdev, &mdev->tconn->data, P_OV_REQUEST, &p.head, sizeof(p));
1443 }
1444
1445 /* called on sndtimeo
1446  * returns false if we should retry,
1447  * true if we think connection is dead
1448  */
1449 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1450 {
1451         int drop_it;
1452         /* long elapsed = (long)(jiffies - mdev->last_received); */
1453
1454         drop_it =   tconn->meta.socket == sock
1455                 || !tconn->asender.task
1456                 || get_t_state(&tconn->asender) != RUNNING
1457                 || tconn->cstate < C_WF_REPORT_PARAMS;
1458
1459         if (drop_it)
1460                 return true;
1461
1462         drop_it = !--tconn->ko_count;
1463         if (!drop_it) {
1464                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1465                          current->comm, current->pid, tconn->ko_count);
1466                 request_ping(tconn);
1467         }
1468
1469         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1470 }
1471
1472 static void drbd_update_congested(struct drbd_tconn *tconn)
1473 {
1474         struct sock *sk = tconn->data.socket->sk;
1475         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1476                 set_bit(NET_CONGESTED, &tconn->flags);
1477 }
1478
1479 /* The idea of sendpage seems to be to put some kind of reference
1480  * to the page into the skb, and to hand it over to the NIC. In
1481  * this process get_page() gets called.
1482  *
1483  * As soon as the page was really sent over the network put_page()
1484  * gets called by some part of the network layer. [ NIC driver? ]
1485  *
1486  * [ get_page() / put_page() increment/decrement the count. If count
1487  *   reaches 0 the page will be freed. ]
1488  *
1489  * This works nicely with pages from FSs.
1490  * But this means that in protocol A we might signal IO completion too early!
1491  *
1492  * In order not to corrupt data during a resync we must make sure
1493  * that we do not reuse our own buffer pages (EEs) to early, therefore
1494  * we have the net_ee list.
1495  *
1496  * XFS seems to have problems, still, it submits pages with page_count == 0!
1497  * As a workaround, we disable sendpage on pages
1498  * with page_count == 0 or PageSlab.
1499  */
1500 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1501                               int offset, size_t size, unsigned msg_flags)
1502 {
1503         struct socket *socket;
1504         void *addr;
1505         int err;
1506
1507         socket = mdev->tconn->data.socket;
1508         addr = kmap(page) + offset;
1509         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1510         kunmap(page);
1511         if (!err)
1512                 mdev->send_cnt += size >> 9;
1513         return err;
1514 }
1515
1516 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1517                     int offset, size_t size, unsigned msg_flags)
1518 {
1519         struct socket *socket = mdev->tconn->data.socket;
1520         mm_segment_t oldfs = get_fs();
1521         int len = size;
1522         int err = -EIO;
1523
1524         /* e.g. XFS meta- & log-data is in slab pages, which have a
1525          * page_count of 0 and/or have PageSlab() set.
1526          * we cannot use send_page for those, as that does get_page();
1527          * put_page(); and would cause either a VM_BUG directly, or
1528          * __page_cache_release a page that would actually still be referenced
1529          * by someone, leading to some obscure delayed Oops somewhere else. */
1530         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1531                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1532
1533         msg_flags |= MSG_NOSIGNAL;
1534         drbd_update_congested(mdev->tconn);
1535         set_fs(KERNEL_DS);
1536         do {
1537                 int sent;
1538
1539                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1540                 if (sent <= 0) {
1541                         if (sent == -EAGAIN) {
1542                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1543                                         break;
1544                                 continue;
1545                         }
1546                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1547                              __func__, (int)size, len, sent);
1548                         if (sent < 0)
1549                                 err = sent;
1550                         break;
1551                 }
1552                 len    -= sent;
1553                 offset += sent;
1554         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1555         set_fs(oldfs);
1556         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1557
1558         if (len == 0) {
1559                 err = 0;
1560                 mdev->send_cnt += size >> 9;
1561         }
1562         return err;
1563 }
1564
1565 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1566 {
1567         struct bio_vec *bvec;
1568         int i;
1569         /* hint all but last page with MSG_MORE */
1570         __bio_for_each_segment(bvec, bio, i, 0) {
1571                 int err;
1572
1573                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1574                                          bvec->bv_offset, bvec->bv_len,
1575                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1576                 if (err)
1577                         return err;
1578         }
1579         return 0;
1580 }
1581
1582 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1583 {
1584         struct bio_vec *bvec;
1585         int i;
1586         /* hint all but last page with MSG_MORE */
1587         __bio_for_each_segment(bvec, bio, i, 0) {
1588                 int err;
1589
1590                 err = _drbd_send_page(mdev, bvec->bv_page,
1591                                       bvec->bv_offset, bvec->bv_len,
1592                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1593                 if (err)
1594                         return err;
1595         }
1596         return 0;
1597 }
1598
1599 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1600                             struct drbd_peer_request *peer_req)
1601 {
1602         struct page *page = peer_req->pages;
1603         unsigned len = peer_req->i.size;
1604         int err;
1605
1606         /* hint all but last page with MSG_MORE */
1607         page_chain_for_each(page) {
1608                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1609
1610                 err = _drbd_send_page(mdev, page, 0, l,
1611                                       page_chain_next(page) ? MSG_MORE : 0);
1612                 if (err)
1613                         return err;
1614                 len -= l;
1615         }
1616         return 0;
1617 }
1618
1619 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1620 {
1621         if (mdev->tconn->agreed_pro_version >= 95)
1622                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1623                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1624                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1625                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1626         else
1627                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1628 }
1629
1630 /* Used to send write requests
1631  * R_PRIMARY -> Peer    (P_DATA)
1632  */
1633 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1634 {
1635         int err;
1636         struct p_data p;
1637         unsigned int dp_flags = 0;
1638         void *dgb;
1639         int dgs;
1640
1641         err = drbd_get_data_sock(mdev->tconn);
1642         if (err)
1643                 return err;
1644
1645         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1646                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1647
1648         prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
1649         p.sector   = cpu_to_be64(req->i.sector);
1650         p.block_id = (unsigned long)req;
1651         p.seq_num  = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1652
1653         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1654
1655         if (mdev->state.conn >= C_SYNC_SOURCE &&
1656             mdev->state.conn <= C_PAUSED_SYNC_T)
1657                 dp_flags |= DP_MAY_SET_IN_SYNC;
1658
1659         p.dp_flags = cpu_to_be32(dp_flags);
1660         set_bit(UNPLUG_REMOTE, &mdev->flags);
1661         err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p,
1662                             sizeof(p), dgs ? MSG_MORE : 0);
1663         if (!err && dgs) {
1664                 dgb = mdev->tconn->int_dig_out;
1665                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
1666                 err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1667         }
1668         if (!err) {
1669                 /* For protocol A, we have to memcpy the payload into
1670                  * socket buffers, as we may complete right away
1671                  * as soon as we handed it over to tcp, at which point the data
1672                  * pages may become invalid.
1673                  *
1674                  * For data-integrity enabled, we copy it as well, so we can be
1675                  * sure that even if the bio pages may still be modified, it
1676                  * won't change the data on the wire, thus if the digest checks
1677                  * out ok after sending on this side, but does not fit on the
1678                  * receiving side, we sure have detected corruption elsewhere.
1679                  */
1680                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
1681                         err = _drbd_send_bio(mdev, req->master_bio);
1682                 else
1683                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1684
1685                 /* double check digest, sometimes buffers have been modified in flight. */
1686                 if (dgs > 0 && dgs <= 64) {
1687                         /* 64 byte, 512 bit, is the largest digest size
1688                          * currently supported in kernel crypto. */
1689                         unsigned char digest[64];
1690                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1691                         if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
1692                                 dev_warn(DEV,
1693                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1694                                         (unsigned long long)req->i.sector, req->i.size);
1695                         }
1696                 } /* else if (dgs > 64) {
1697                      ... Be noisy about digest too large ...
1698                 } */
1699         }
1700
1701         drbd_put_data_sock(mdev->tconn);
1702
1703         return err;
1704 }
1705
1706 /* answer packet, used to send data back for read requests:
1707  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1708  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1709  */
1710 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1711                     struct drbd_peer_request *peer_req)
1712 {
1713         int err;
1714         struct p_data p;
1715         void *dgb;
1716         int dgs;
1717
1718         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1719                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1720
1721         prepare_header(mdev, &p.head, cmd, sizeof(p) -
1722                                            sizeof(struct p_header80) +
1723                                            dgs + peer_req->i.size);
1724         p.sector   = cpu_to_be64(peer_req->i.sector);
1725         p.block_id = peer_req->block_id;
1726         p.seq_num = 0;  /* unused */
1727
1728         /* Only called by our kernel thread.
1729          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
1730          * in response to admin command or module unload.
1731          */
1732         err = drbd_get_data_sock(mdev->tconn);
1733         if (err)
1734                 return err;
1735         err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p,
1736                             sizeof(p), dgs ? MSG_MORE : 0);
1737         if (!err && dgs) {
1738                 dgb = mdev->tconn->int_dig_out;
1739                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
1740                 err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, dgb,
1741                                     dgs, 0);
1742         }
1743         if (!err)
1744                 err = _drbd_send_zc_ee(mdev, peer_req);
1745         drbd_put_data_sock(mdev->tconn);
1746
1747         return err;
1748 }
1749
1750 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1751 {
1752         struct p_block_desc p;
1753
1754         p.sector  = cpu_to_be64(req->i.sector);
1755         p.blksize = cpu_to_be32(req->i.size);
1756
1757         return drbd_send_cmd(mdev, &mdev->tconn->data, P_OUT_OF_SYNC, &p.head, sizeof(p));
1758 }
1759
1760 /*
1761   drbd_send distinguishes two cases:
1762
1763   Packets sent via the data socket "sock"
1764   and packets sent via the meta data socket "msock"
1765
1766                     sock                      msock
1767   -----------------+-------------------------+------------------------------
1768   timeout           conf.timeout / 2          conf.timeout / 2
1769   timeout action    send a ping via msock     Abort communication
1770                                               and close all sockets
1771 */
1772
1773 /*
1774  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1775  */
1776 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1777               void *buf, size_t size, unsigned msg_flags)
1778 {
1779         struct kvec iov;
1780         struct msghdr msg;
1781         int rv, sent = 0;
1782
1783         if (!sock)
1784                 return -EBADR;
1785
1786         /* THINK  if (signal_pending) return ... ? */
1787
1788         iov.iov_base = buf;
1789         iov.iov_len  = size;
1790
1791         msg.msg_name       = NULL;
1792         msg.msg_namelen    = 0;
1793         msg.msg_control    = NULL;
1794         msg.msg_controllen = 0;
1795         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1796
1797         if (sock == tconn->data.socket) {
1798                 tconn->ko_count = tconn->net_conf->ko_count;
1799                 drbd_update_congested(tconn);
1800         }
1801         do {
1802                 /* STRANGE
1803                  * tcp_sendmsg does _not_ use its size parameter at all ?
1804                  *
1805                  * -EAGAIN on timeout, -EINTR on signal.
1806                  */
1807 /* THINK
1808  * do we need to block DRBD_SIG if sock == &meta.socket ??
1809  * otherwise wake_asender() might interrupt some send_*Ack !
1810  */
1811                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1812                 if (rv == -EAGAIN) {
1813                         if (we_should_drop_the_connection(tconn, sock))
1814                                 break;
1815                         else
1816                                 continue;
1817                 }
1818                 if (rv == -EINTR) {
1819                         flush_signals(current);
1820                         rv = 0;
1821                 }
1822                 if (rv < 0)
1823                         break;
1824                 sent += rv;
1825                 iov.iov_base += rv;
1826                 iov.iov_len  -= rv;
1827         } while (sent < size);
1828
1829         if (sock == tconn->data.socket)
1830                 clear_bit(NET_CONGESTED, &tconn->flags);
1831
1832         if (rv <= 0) {
1833                 if (rv != -EAGAIN) {
1834                         conn_err(tconn, "%s_sendmsg returned %d\n",
1835                                  sock == tconn->meta.socket ? "msock" : "sock",
1836                                  rv);
1837                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1838                 } else
1839                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1840         }
1841
1842         return sent;
1843 }
1844
1845 /**
1846  * drbd_send_all  -  Send an entire buffer
1847  *
1848  * Returns 0 upon success and a negative error value otherwise.
1849  */
1850 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1851                   size_t size, unsigned msg_flags)
1852 {
1853         int err;
1854
1855         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1856         if (err < 0)
1857                 return err;
1858         if (err != size)
1859                 return -EIO;
1860         return 0;
1861 }
1862
1863 static int drbd_open(struct block_device *bdev, fmode_t mode)
1864 {
1865         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1866         unsigned long flags;
1867         int rv = 0;
1868
1869         mutex_lock(&drbd_main_mutex);
1870         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1871         /* to have a stable mdev->state.role
1872          * and no race with updating open_cnt */
1873
1874         if (mdev->state.role != R_PRIMARY) {
1875                 if (mode & FMODE_WRITE)
1876                         rv = -EROFS;
1877                 else if (!allow_oos)
1878                         rv = -EMEDIUMTYPE;
1879         }
1880
1881         if (!rv)
1882                 mdev->open_cnt++;
1883         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1884         mutex_unlock(&drbd_main_mutex);
1885
1886         return rv;
1887 }
1888
1889 static int drbd_release(struct gendisk *gd, fmode_t mode)
1890 {
1891         struct drbd_conf *mdev = gd->private_data;
1892         mutex_lock(&drbd_main_mutex);
1893         mdev->open_cnt--;
1894         mutex_unlock(&drbd_main_mutex);
1895         return 0;
1896 }
1897
1898 static void drbd_set_defaults(struct drbd_conf *mdev)
1899 {
1900         /* Beware! The actual layout differs
1901          * between big endian and little endian */
1902         mdev->state = (union drbd_dev_state) {
1903                 { .role = R_SECONDARY,
1904                   .peer = R_UNKNOWN,
1905                   .conn = C_STANDALONE,
1906                   .disk = D_DISKLESS,
1907                   .pdsk = D_UNKNOWN,
1908                 } };
1909 }
1910
1911 void drbd_init_set_defaults(struct drbd_conf *mdev)
1912 {
1913         /* the memset(,0,) did most of this.
1914          * note: only assignments, no allocation in here */
1915
1916         drbd_set_defaults(mdev);
1917
1918         atomic_set(&mdev->ap_bio_cnt, 0);
1919         atomic_set(&mdev->ap_pending_cnt, 0);
1920         atomic_set(&mdev->rs_pending_cnt, 0);
1921         atomic_set(&mdev->unacked_cnt, 0);
1922         atomic_set(&mdev->local_cnt, 0);
1923         atomic_set(&mdev->pp_in_use_by_net, 0);
1924         atomic_set(&mdev->rs_sect_in, 0);
1925         atomic_set(&mdev->rs_sect_ev, 0);
1926         atomic_set(&mdev->ap_in_flight, 0);
1927
1928         mutex_init(&mdev->md_io_mutex);
1929         mutex_init(&mdev->own_state_mutex);
1930         mdev->state_mutex = &mdev->own_state_mutex;
1931
1932         spin_lock_init(&mdev->al_lock);
1933         spin_lock_init(&mdev->peer_seq_lock);
1934         spin_lock_init(&mdev->epoch_lock);
1935
1936         INIT_LIST_HEAD(&mdev->active_ee);
1937         INIT_LIST_HEAD(&mdev->sync_ee);
1938         INIT_LIST_HEAD(&mdev->done_ee);
1939         INIT_LIST_HEAD(&mdev->read_ee);
1940         INIT_LIST_HEAD(&mdev->net_ee);
1941         INIT_LIST_HEAD(&mdev->resync_reads);
1942         INIT_LIST_HEAD(&mdev->resync_work.list);
1943         INIT_LIST_HEAD(&mdev->unplug_work.list);
1944         INIT_LIST_HEAD(&mdev->go_diskless.list);
1945         INIT_LIST_HEAD(&mdev->md_sync_work.list);
1946         INIT_LIST_HEAD(&mdev->start_resync_work.list);
1947         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1948
1949         mdev->resync_work.cb  = w_resync_timer;
1950         mdev->unplug_work.cb  = w_send_write_hint;
1951         mdev->go_diskless.cb  = w_go_diskless;
1952         mdev->md_sync_work.cb = w_md_sync;
1953         mdev->bm_io_work.w.cb = w_bitmap_io;
1954         mdev->start_resync_work.cb = w_start_resync;
1955
1956         mdev->resync_work.mdev  = mdev;
1957         mdev->unplug_work.mdev  = mdev;
1958         mdev->go_diskless.mdev  = mdev;
1959         mdev->md_sync_work.mdev = mdev;
1960         mdev->bm_io_work.w.mdev = mdev;
1961         mdev->start_resync_work.mdev = mdev;
1962
1963         init_timer(&mdev->resync_timer);
1964         init_timer(&mdev->md_sync_timer);
1965         init_timer(&mdev->start_resync_timer);
1966         init_timer(&mdev->request_timer);
1967         mdev->resync_timer.function = resync_timer_fn;
1968         mdev->resync_timer.data = (unsigned long) mdev;
1969         mdev->md_sync_timer.function = md_sync_timer_fn;
1970         mdev->md_sync_timer.data = (unsigned long) mdev;
1971         mdev->start_resync_timer.function = start_resync_timer_fn;
1972         mdev->start_resync_timer.data = (unsigned long) mdev;
1973         mdev->request_timer.function = request_timer_fn;
1974         mdev->request_timer.data = (unsigned long) mdev;
1975
1976         init_waitqueue_head(&mdev->misc_wait);
1977         init_waitqueue_head(&mdev->state_wait);
1978         init_waitqueue_head(&mdev->ee_wait);
1979         init_waitqueue_head(&mdev->al_wait);
1980         init_waitqueue_head(&mdev->seq_wait);
1981
1982         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
1983         mdev->write_ordering = WO_bdev_flush;
1984         mdev->resync_wenr = LC_FREE;
1985         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1986         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1987 }
1988
1989 void drbd_mdev_cleanup(struct drbd_conf *mdev)
1990 {
1991         int i;
1992         if (mdev->tconn->receiver.t_state != NONE)
1993                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
1994                                 mdev->tconn->receiver.t_state);
1995
1996         /* no need to lock it, I'm the only thread alive */
1997         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
1998                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
1999         mdev->al_writ_cnt  =
2000         mdev->bm_writ_cnt  =
2001         mdev->read_cnt     =
2002         mdev->recv_cnt     =
2003         mdev->send_cnt     =
2004         mdev->writ_cnt     =
2005         mdev->p_size       =
2006         mdev->rs_start     =
2007         mdev->rs_total     =
2008         mdev->rs_failed    = 0;
2009         mdev->rs_last_events = 0;
2010         mdev->rs_last_sect_ev = 0;
2011         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2012                 mdev->rs_mark_left[i] = 0;
2013                 mdev->rs_mark_time[i] = 0;
2014         }
2015         D_ASSERT(mdev->tconn->net_conf == NULL);
2016
2017         drbd_set_my_capacity(mdev, 0);
2018         if (mdev->bitmap) {
2019                 /* maybe never allocated. */
2020                 drbd_bm_resize(mdev, 0, 1);
2021                 drbd_bm_cleanup(mdev);
2022         }
2023
2024         drbd_free_resources(mdev);
2025         clear_bit(AL_SUSPENDED, &mdev->flags);
2026
2027         /*
2028          * currently we drbd_init_ee only on module load, so
2029          * we may do drbd_release_ee only on module unload!
2030          */
2031         D_ASSERT(list_empty(&mdev->active_ee));
2032         D_ASSERT(list_empty(&mdev->sync_ee));
2033         D_ASSERT(list_empty(&mdev->done_ee));
2034         D_ASSERT(list_empty(&mdev->read_ee));
2035         D_ASSERT(list_empty(&mdev->net_ee));
2036         D_ASSERT(list_empty(&mdev->resync_reads));
2037         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2038         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2039         D_ASSERT(list_empty(&mdev->resync_work.list));
2040         D_ASSERT(list_empty(&mdev->unplug_work.list));
2041         D_ASSERT(list_empty(&mdev->go_diskless.list));
2042
2043         drbd_set_defaults(mdev);
2044 }
2045
2046
2047 static void drbd_destroy_mempools(void)
2048 {
2049         struct page *page;
2050
2051         while (drbd_pp_pool) {
2052                 page = drbd_pp_pool;
2053                 drbd_pp_pool = (struct page *)page_private(page);
2054                 __free_page(page);
2055                 drbd_pp_vacant--;
2056         }
2057
2058         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2059
2060         if (drbd_md_io_bio_set)
2061                 bioset_free(drbd_md_io_bio_set);
2062         if (drbd_md_io_page_pool)
2063                 mempool_destroy(drbd_md_io_page_pool);
2064         if (drbd_ee_mempool)
2065                 mempool_destroy(drbd_ee_mempool);
2066         if (drbd_request_mempool)
2067                 mempool_destroy(drbd_request_mempool);
2068         if (drbd_ee_cache)
2069                 kmem_cache_destroy(drbd_ee_cache);
2070         if (drbd_request_cache)
2071                 kmem_cache_destroy(drbd_request_cache);
2072         if (drbd_bm_ext_cache)
2073                 kmem_cache_destroy(drbd_bm_ext_cache);
2074         if (drbd_al_ext_cache)
2075                 kmem_cache_destroy(drbd_al_ext_cache);
2076
2077         drbd_md_io_bio_set   = NULL;
2078         drbd_md_io_page_pool = NULL;
2079         drbd_ee_mempool      = NULL;
2080         drbd_request_mempool = NULL;
2081         drbd_ee_cache        = NULL;
2082         drbd_request_cache   = NULL;
2083         drbd_bm_ext_cache    = NULL;
2084         drbd_al_ext_cache    = NULL;
2085
2086         return;
2087 }
2088
2089 static int drbd_create_mempools(void)
2090 {
2091         struct page *page;
2092         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2093         int i;
2094
2095         /* prepare our caches and mempools */
2096         drbd_request_mempool = NULL;
2097         drbd_ee_cache        = NULL;
2098         drbd_request_cache   = NULL;
2099         drbd_bm_ext_cache    = NULL;
2100         drbd_al_ext_cache    = NULL;
2101         drbd_pp_pool         = NULL;
2102         drbd_md_io_page_pool = NULL;
2103         drbd_md_io_bio_set   = NULL;
2104
2105         /* caches */
2106         drbd_request_cache = kmem_cache_create(
2107                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2108         if (drbd_request_cache == NULL)
2109                 goto Enomem;
2110
2111         drbd_ee_cache = kmem_cache_create(
2112                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2113         if (drbd_ee_cache == NULL)
2114                 goto Enomem;
2115
2116         drbd_bm_ext_cache = kmem_cache_create(
2117                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2118         if (drbd_bm_ext_cache == NULL)
2119                 goto Enomem;
2120
2121         drbd_al_ext_cache = kmem_cache_create(
2122                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2123         if (drbd_al_ext_cache == NULL)
2124                 goto Enomem;
2125
2126         /* mempools */
2127         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2128         if (drbd_md_io_bio_set == NULL)
2129                 goto Enomem;
2130
2131         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2132         if (drbd_md_io_page_pool == NULL)
2133                 goto Enomem;
2134
2135         drbd_request_mempool = mempool_create(number,
2136                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2137         if (drbd_request_mempool == NULL)
2138                 goto Enomem;
2139
2140         drbd_ee_mempool = mempool_create(number,
2141                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2142         if (drbd_ee_mempool == NULL)
2143                 goto Enomem;
2144
2145         /* drbd's page pool */
2146         spin_lock_init(&drbd_pp_lock);
2147
2148         for (i = 0; i < number; i++) {
2149                 page = alloc_page(GFP_HIGHUSER);
2150                 if (!page)
2151                         goto Enomem;
2152                 set_page_private(page, (unsigned long)drbd_pp_pool);
2153                 drbd_pp_pool = page;
2154         }
2155         drbd_pp_vacant = number;
2156
2157         return 0;
2158
2159 Enomem:
2160         drbd_destroy_mempools(); /* in case we allocated some */
2161         return -ENOMEM;
2162 }
2163
2164 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2165         void *unused)
2166 {
2167         /* just so we have it.  you never know what interesting things we
2168          * might want to do here some day...
2169          */
2170
2171         return NOTIFY_DONE;
2172 }
2173
2174 static struct notifier_block drbd_notifier = {
2175         .notifier_call = drbd_notify_sys,
2176 };
2177
2178 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2179 {
2180         int rr;
2181
2182         rr = drbd_release_ee(mdev, &mdev->active_ee);
2183         if (rr)
2184                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2185
2186         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2187         if (rr)
2188                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2189
2190         rr = drbd_release_ee(mdev, &mdev->read_ee);
2191         if (rr)
2192                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2193
2194         rr = drbd_release_ee(mdev, &mdev->done_ee);
2195         if (rr)
2196                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2197
2198         rr = drbd_release_ee(mdev, &mdev->net_ee);
2199         if (rr)
2200                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2201 }
2202
2203 /* caution. no locking. */
2204 void drbd_delete_device(unsigned int minor)
2205 {
2206         struct drbd_conf *mdev = minor_to_mdev(minor);
2207
2208         if (!mdev)
2209                 return;
2210
2211         idr_remove(&mdev->tconn->volumes, mdev->vnr);
2212         idr_remove(&minors, minor);
2213         synchronize_rcu();
2214
2215         /* paranoia asserts */
2216         D_ASSERT(mdev->open_cnt == 0);
2217         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2218         /* end paranoia asserts */
2219
2220         del_gendisk(mdev->vdisk);
2221
2222         /* cleanup stuff that may have been allocated during
2223          * device (re-)configuration or state changes */
2224
2225         if (mdev->this_bdev)
2226                 bdput(mdev->this_bdev);
2227
2228         drbd_free_resources(mdev);
2229
2230         drbd_release_ee_lists(mdev);
2231
2232         lc_destroy(mdev->act_log);
2233         lc_destroy(mdev->resync);
2234
2235         kfree(mdev->p_uuid);
2236         /* mdev->p_uuid = NULL; */
2237
2238         /* cleanup the rest that has been
2239          * allocated from drbd_new_device
2240          * and actually free the mdev itself */
2241         drbd_free_mdev(mdev);
2242 }
2243
2244 static void drbd_cleanup(void)
2245 {
2246         unsigned int i;
2247         struct drbd_conf *mdev;
2248
2249         unregister_reboot_notifier(&drbd_notifier);
2250
2251         /* first remove proc,
2252          * drbdsetup uses it's presence to detect
2253          * whether DRBD is loaded.
2254          * If we would get stuck in proc removal,
2255          * but have netlink already deregistered,
2256          * some drbdsetup commands may wait forever
2257          * for an answer.
2258          */
2259         if (drbd_proc)
2260                 remove_proc_entry("drbd", NULL);
2261
2262         drbd_genl_unregister();
2263
2264         idr_for_each_entry(&minors, mdev, i)
2265                 drbd_delete_device(i);
2266         drbd_destroy_mempools();
2267         unregister_blkdev(DRBD_MAJOR, "drbd");
2268
2269         idr_destroy(&minors);
2270
2271         printk(KERN_INFO "drbd: module cleanup done.\n");
2272 }
2273
2274 /**
2275  * drbd_congested() - Callback for pdflush
2276  * @congested_data:     User data
2277  * @bdi_bits:           Bits pdflush is currently interested in
2278  *
2279  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2280  */
2281 static int drbd_congested(void *congested_data, int bdi_bits)
2282 {
2283         struct drbd_conf *mdev = congested_data;
2284         struct request_queue *q;
2285         char reason = '-';
2286         int r = 0;
2287
2288         if (!may_inc_ap_bio(mdev)) {
2289                 /* DRBD has frozen IO */
2290                 r = bdi_bits;
2291                 reason = 'd';
2292                 goto out;
2293         }
2294
2295         if (get_ldev(mdev)) {
2296                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2297                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2298                 put_ldev(mdev);
2299                 if (r)
2300                         reason = 'b';
2301         }
2302
2303         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2304                 r |= (1 << BDI_async_congested);
2305                 reason = reason == 'b' ? 'a' : 'n';
2306         }
2307
2308 out:
2309         mdev->congestion_reason = reason;
2310         return r;
2311 }
2312
2313 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2314 {
2315         sema_init(&wq->s, 0);
2316         spin_lock_init(&wq->q_lock);
2317         INIT_LIST_HEAD(&wq->q);
2318 }
2319
2320 struct drbd_tconn *conn_by_name(const char *name)
2321 {
2322         struct drbd_tconn *tconn;
2323
2324         if (!name || !name[0])
2325                 return NULL;
2326
2327         mutex_lock(&drbd_cfg_mutex);
2328         list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
2329                 if (!strcmp(tconn->name, name))
2330                         goto found;
2331         }
2332         tconn = NULL;
2333 found:
2334         mutex_unlock(&drbd_cfg_mutex);
2335         return tconn;
2336 }
2337
2338 static int drbd_alloc_socket(struct drbd_socket *socket)
2339 {
2340         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2341         if (!socket->rbuf)
2342                 return -ENOMEM;
2343         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2344         if (!socket->sbuf)
2345                 return -ENOMEM;
2346         return 0;
2347 }
2348
2349 static void drbd_free_socket(struct drbd_socket *socket)
2350 {
2351         free_page((unsigned long) socket->sbuf);
2352         free_page((unsigned long) socket->rbuf);
2353 }
2354
2355 struct drbd_tconn *drbd_new_tconn(const char *name)
2356 {
2357         struct drbd_tconn *tconn;
2358
2359         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2360         if (!tconn)
2361                 return NULL;
2362
2363         tconn->name = kstrdup(name, GFP_KERNEL);
2364         if (!tconn->name)
2365                 goto fail;
2366
2367         if (drbd_alloc_socket(&tconn->data))
2368                 goto fail;
2369         if (drbd_alloc_socket(&tconn->meta))
2370                 goto fail;
2371
2372         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2373                 goto fail;
2374
2375         if (!tl_init(tconn))
2376                 goto fail;
2377
2378         tconn->cstate = C_STANDALONE;
2379         mutex_init(&tconn->cstate_mutex);
2380         spin_lock_init(&tconn->req_lock);
2381         atomic_set(&tconn->net_cnt, 0);
2382         init_waitqueue_head(&tconn->net_cnt_wait);
2383         init_waitqueue_head(&tconn->ping_wait);
2384         idr_init(&tconn->volumes);
2385
2386         drbd_init_workqueue(&tconn->data.work);
2387         mutex_init(&tconn->data.mutex);
2388
2389         drbd_init_workqueue(&tconn->meta.work);
2390         mutex_init(&tconn->meta.mutex);
2391
2392         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2393         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2394         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2395
2396         tconn->res_opts = (struct res_opts) {
2397                 {}, 0, /* cpu_mask */
2398                 DRBD_ON_NO_DATA_DEF, /* on_no_data */
2399         };
2400
2401         mutex_lock(&drbd_cfg_mutex);
2402         list_add_tail(&tconn->all_tconn, &drbd_tconns);
2403         mutex_unlock(&drbd_cfg_mutex);
2404
2405         return tconn;
2406
2407 fail:
2408         tl_cleanup(tconn);
2409         free_cpumask_var(tconn->cpu_mask);
2410         drbd_free_socket(&tconn->meta);
2411         drbd_free_socket(&tconn->data);
2412         kfree(tconn->name);
2413         kfree(tconn);
2414
2415         return NULL;
2416 }
2417
2418 void drbd_free_tconn(struct drbd_tconn *tconn)
2419 {
2420         list_del(&tconn->all_tconn);
2421         idr_destroy(&tconn->volumes);
2422
2423         free_cpumask_var(tconn->cpu_mask);
2424         drbd_free_socket(&tconn->meta);
2425         drbd_free_socket(&tconn->data);
2426         kfree(tconn->name);
2427         kfree(tconn->int_dig_out);
2428         kfree(tconn->int_dig_in);
2429         kfree(tconn->int_dig_vv);
2430         kfree(tconn);
2431 }
2432
2433 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2434 {
2435         struct drbd_conf *mdev;
2436         struct gendisk *disk;
2437         struct request_queue *q;
2438         int vnr_got = vnr;
2439         int minor_got = minor;
2440         enum drbd_ret_code err = ERR_NOMEM;
2441
2442         mdev = minor_to_mdev(minor);
2443         if (mdev)
2444                 return ERR_MINOR_EXISTS;
2445
2446         /* GFP_KERNEL, we are outside of all write-out paths */
2447         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2448         if (!mdev)
2449                 return ERR_NOMEM;
2450
2451         mdev->tconn = tconn;
2452         mdev->minor = minor;
2453         mdev->vnr = vnr;
2454
2455         drbd_init_set_defaults(mdev);
2456
2457         q = blk_alloc_queue(GFP_KERNEL);
2458         if (!q)
2459                 goto out_no_q;
2460         mdev->rq_queue = q;
2461         q->queuedata   = mdev;
2462
2463         disk = alloc_disk(1);
2464         if (!disk)
2465                 goto out_no_disk;
2466         mdev->vdisk = disk;
2467
2468         set_disk_ro(disk, true);
2469
2470         disk->queue = q;
2471         disk->major = DRBD_MAJOR;
2472         disk->first_minor = minor;
2473         disk->fops = &drbd_ops;
2474         sprintf(disk->disk_name, "drbd%d", minor);
2475         disk->private_data = mdev;
2476
2477         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2478         /* we have no partitions. we contain only ourselves. */
2479         mdev->this_bdev->bd_contains = mdev->this_bdev;
2480
2481         q->backing_dev_info.congested_fn = drbd_congested;
2482         q->backing_dev_info.congested_data = mdev;
2483
2484         blk_queue_make_request(q, drbd_make_request);
2485         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2486            This triggers a max_bio_size message upon first attach or connect */
2487         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2488         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2489         blk_queue_merge_bvec(q, drbd_merge_bvec);
2490         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2491
2492         mdev->md_io_page = alloc_page(GFP_KERNEL);
2493         if (!mdev->md_io_page)
2494                 goto out_no_io_page;
2495
2496         if (drbd_bm_init(mdev))
2497                 goto out_no_bitmap;
2498         mdev->read_requests = RB_ROOT;
2499         mdev->write_requests = RB_ROOT;
2500
2501         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2502         if (!mdev->current_epoch)
2503                 goto out_no_epoch;
2504
2505         INIT_LIST_HEAD(&mdev->current_epoch->list);
2506         mdev->epochs = 1;
2507
2508         if (!idr_pre_get(&minors, GFP_KERNEL))
2509                 goto out_no_minor_idr;
2510         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2511                 goto out_no_minor_idr;
2512         if (minor_got != minor) {
2513                 err = ERR_MINOR_EXISTS;
2514                 drbd_msg_put_info("requested minor exists already");
2515                 goto out_idr_remove_minor;
2516         }
2517
2518         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2519                 goto out_idr_remove_minor;
2520         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2521                 goto out_idr_remove_minor;
2522         if (vnr_got != vnr) {
2523                 err = ERR_INVALID_REQUEST;
2524                 drbd_msg_put_info("requested volume exists already");
2525                 goto out_idr_remove_vol;
2526         }
2527         add_disk(disk);
2528
2529         /* inherit the connection state */
2530         mdev->state.conn = tconn->cstate;
2531         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2532                 drbd_connected(vnr, mdev, tconn);
2533
2534         return NO_ERROR;
2535
2536 out_idr_remove_vol:
2537         idr_remove(&tconn->volumes, vnr_got);
2538 out_idr_remove_minor:
2539         idr_remove(&minors, minor_got);
2540         synchronize_rcu();
2541 out_no_minor_idr:
2542         kfree(mdev->current_epoch);
2543 out_no_epoch:
2544         drbd_bm_cleanup(mdev);
2545 out_no_bitmap:
2546         __free_page(mdev->md_io_page);
2547 out_no_io_page:
2548         put_disk(disk);
2549 out_no_disk:
2550         blk_cleanup_queue(q);
2551 out_no_q:
2552         kfree(mdev);
2553         return err;
2554 }
2555
2556 /* counterpart of drbd_new_device.
2557  * last part of drbd_delete_device. */
2558 void drbd_free_mdev(struct drbd_conf *mdev)
2559 {
2560         kfree(mdev->current_epoch);
2561         if (mdev->bitmap) /* should no longer be there. */
2562                 drbd_bm_cleanup(mdev);
2563         __free_page(mdev->md_io_page);
2564         put_disk(mdev->vdisk);
2565         blk_cleanup_queue(mdev->rq_queue);
2566         kfree(mdev);
2567 }
2568
2569
2570 int __init drbd_init(void)
2571 {
2572         int err;
2573
2574         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
2575         BUILD_BUG_ON(sizeof(struct p_connection_features) != 80);
2576
2577         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2578                 printk(KERN_ERR
2579                        "drbd: invalid minor_count (%d)\n", minor_count);
2580 #ifdef MODULE
2581                 return -EINVAL;
2582 #else
2583                 minor_count = 8;
2584 #endif
2585         }
2586
2587         err = register_blkdev(DRBD_MAJOR, "drbd");
2588         if (err) {
2589                 printk(KERN_ERR
2590                        "drbd: unable to register block device major %d\n",
2591                        DRBD_MAJOR);
2592                 return err;
2593         }
2594
2595         err = drbd_genl_register();
2596         if (err) {
2597                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2598                 goto fail;
2599         }
2600
2601
2602         register_reboot_notifier(&drbd_notifier);
2603
2604         /*
2605          * allocate all necessary structs
2606          */
2607         err = -ENOMEM;
2608
2609         init_waitqueue_head(&drbd_pp_wait);
2610
2611         drbd_proc = NULL; /* play safe for drbd_cleanup */
2612         idr_init(&minors);
2613
2614         err = drbd_create_mempools();
2615         if (err)
2616                 goto fail;
2617
2618         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2619         if (!drbd_proc) {
2620                 printk(KERN_ERR "drbd: unable to register proc file\n");
2621                 goto fail;
2622         }
2623
2624         rwlock_init(&global_state_lock);
2625         INIT_LIST_HEAD(&drbd_tconns);
2626
2627         printk(KERN_INFO "drbd: initialized. "
2628                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2629                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2630         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2631         printk(KERN_INFO "drbd: registered as block device major %d\n",
2632                 DRBD_MAJOR);
2633
2634         return 0; /* Success! */
2635
2636 fail:
2637         drbd_cleanup();
2638         if (err == -ENOMEM)
2639                 /* currently always the case */
2640                 printk(KERN_ERR "drbd: ran out of memory\n");
2641         else
2642                 printk(KERN_ERR "drbd: initialization failure\n");
2643         return err;
2644 }
2645
2646 void drbd_free_bc(struct drbd_backing_dev *ldev)
2647 {
2648         if (ldev == NULL)
2649                 return;
2650
2651         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2652         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2653
2654         kfree(ldev);
2655 }
2656
2657 void drbd_free_sock(struct drbd_tconn *tconn)
2658 {
2659         if (tconn->data.socket) {
2660                 mutex_lock(&tconn->data.mutex);
2661                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2662                 sock_release(tconn->data.socket);
2663                 tconn->data.socket = NULL;
2664                 mutex_unlock(&tconn->data.mutex);
2665         }
2666         if (tconn->meta.socket) {
2667                 mutex_lock(&tconn->meta.mutex);
2668                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2669                 sock_release(tconn->meta.socket);
2670                 tconn->meta.socket = NULL;
2671                 mutex_unlock(&tconn->meta.mutex);
2672         }
2673 }
2674
2675
2676 void drbd_free_resources(struct drbd_conf *mdev)
2677 {
2678         crypto_free_hash(mdev->tconn->csums_tfm);
2679         mdev->tconn->csums_tfm = NULL;
2680         crypto_free_hash(mdev->tconn->verify_tfm);
2681         mdev->tconn->verify_tfm = NULL;
2682         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
2683         mdev->tconn->cram_hmac_tfm = NULL;
2684         crypto_free_hash(mdev->tconn->integrity_w_tfm);
2685         mdev->tconn->integrity_w_tfm = NULL;
2686         crypto_free_hash(mdev->tconn->integrity_r_tfm);
2687         mdev->tconn->integrity_r_tfm = NULL;
2688
2689         drbd_free_sock(mdev->tconn);
2690
2691         __no_warn(local,
2692                   drbd_free_bc(mdev->ldev);
2693                   mdev->ldev = NULL;);
2694 }
2695
2696 /* meta data management */
2697
2698 struct meta_data_on_disk {
2699         u64 la_size;           /* last agreed size. */
2700         u64 uuid[UI_SIZE];   /* UUIDs. */
2701         u64 device_uuid;
2702         u64 reserved_u64_1;
2703         u32 flags;             /* MDF */
2704         u32 magic;
2705         u32 md_size_sect;
2706         u32 al_offset;         /* offset to this block */
2707         u32 al_nr_extents;     /* important for restoring the AL */
2708               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2709         u32 bm_offset;         /* offset to the bitmap, from here */
2710         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2711         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2712         u32 reserved_u32[3];
2713
2714 } __packed;
2715
2716 /**
2717  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2718  * @mdev:       DRBD device.
2719  */
2720 void drbd_md_sync(struct drbd_conf *mdev)
2721 {
2722         struct meta_data_on_disk *buffer;
2723         sector_t sector;
2724         int i;
2725
2726         del_timer(&mdev->md_sync_timer);
2727         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2728         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2729                 return;
2730
2731         /* We use here D_FAILED and not D_ATTACHING because we try to write
2732          * metadata even if we detach due to a disk failure! */
2733         if (!get_ldev_if_state(mdev, D_FAILED))
2734                 return;
2735
2736         mutex_lock(&mdev->md_io_mutex);
2737         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2738         memset(buffer, 0, 512);
2739
2740         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2741         for (i = UI_CURRENT; i < UI_SIZE; i++)
2742                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2743         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2744         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2745
2746         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2747         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2748         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2749         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2750         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2751
2752         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2753         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2754
2755         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2756         sector = mdev->ldev->md.md_offset;
2757
2758         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2759                 /* this was a try anyways ... */
2760                 dev_err(DEV, "meta data update failed!\n");
2761                 drbd_chk_io_error(mdev, 1, true);
2762         }
2763
2764         /* Update mdev->ldev->md.la_size_sect,
2765          * since we updated it on metadata. */
2766         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2767
2768         mutex_unlock(&mdev->md_io_mutex);
2769         put_ldev(mdev);
2770 }
2771
2772 /**
2773  * drbd_md_read() - Reads in the meta data super block
2774  * @mdev:       DRBD device.
2775  * @bdev:       Device from which the meta data should be read in.
2776  *
2777  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2778  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2779  */
2780 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2781 {
2782         struct meta_data_on_disk *buffer;
2783         int i, rv = NO_ERROR;
2784
2785         if (!get_ldev_if_state(mdev, D_ATTACHING))
2786                 return ERR_IO_MD_DISK;
2787
2788         mutex_lock(&mdev->md_io_mutex);
2789         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2790
2791         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2792                 /* NOTE: can't do normal error processing here as this is
2793                    called BEFORE disk is attached */
2794                 dev_err(DEV, "Error while reading metadata.\n");
2795                 rv = ERR_IO_MD_DISK;
2796                 goto err;
2797         }
2798
2799         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2800                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2801                 rv = ERR_MD_INVALID;
2802                 goto err;
2803         }
2804         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2805                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2806                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2807                 rv = ERR_MD_INVALID;
2808                 goto err;
2809         }
2810         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2811                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2812                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2813                 rv = ERR_MD_INVALID;
2814                 goto err;
2815         }
2816         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2817                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2818                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2819                 rv = ERR_MD_INVALID;
2820                 goto err;
2821         }
2822
2823         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2824                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2825                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2826                 rv = ERR_MD_INVALID;
2827                 goto err;
2828         }
2829
2830         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2831         for (i = UI_CURRENT; i < UI_SIZE; i++)
2832                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2833         bdev->md.flags = be32_to_cpu(buffer->flags);
2834         bdev->dc.al_extents = be32_to_cpu(buffer->al_nr_extents);
2835         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2836
2837         spin_lock_irq(&mdev->tconn->req_lock);
2838         if (mdev->state.conn < C_CONNECTED) {
2839                 int peer;
2840                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2841                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2842                 mdev->peer_max_bio_size = peer;
2843         }
2844         spin_unlock_irq(&mdev->tconn->req_lock);
2845
2846         if (bdev->dc.al_extents < 7)
2847                 bdev->dc.al_extents = 127;
2848
2849  err:
2850         mutex_unlock(&mdev->md_io_mutex);
2851         put_ldev(mdev);
2852
2853         return rv;
2854 }
2855
2856 /**
2857  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2858  * @mdev:       DRBD device.
2859  *
2860  * Call this function if you change anything that should be written to
2861  * the meta-data super block. This function sets MD_DIRTY, and starts a
2862  * timer that ensures that within five seconds you have to call drbd_md_sync().
2863  */
2864 #ifdef DEBUG
2865 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2866 {
2867         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2868                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2869                 mdev->last_md_mark_dirty.line = line;
2870                 mdev->last_md_mark_dirty.func = func;
2871         }
2872 }
2873 #else
2874 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2875 {
2876         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2877                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2878 }
2879 #endif
2880
2881 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2882 {
2883         int i;
2884
2885         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2886                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2887 }
2888
2889 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2890 {
2891         if (idx == UI_CURRENT) {
2892                 if (mdev->state.role == R_PRIMARY)
2893                         val |= 1;
2894                 else
2895                         val &= ~((u64)1);
2896
2897                 drbd_set_ed_uuid(mdev, val);
2898         }
2899
2900         mdev->ldev->md.uuid[idx] = val;
2901         drbd_md_mark_dirty(mdev);
2902 }
2903
2904
2905 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2906 {
2907         if (mdev->ldev->md.uuid[idx]) {
2908                 drbd_uuid_move_history(mdev);
2909                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2910         }
2911         _drbd_uuid_set(mdev, idx, val);
2912 }
2913
2914 /**
2915  * drbd_uuid_new_current() - Creates a new current UUID
2916  * @mdev:       DRBD device.
2917  *
2918  * Creates a new current UUID, and rotates the old current UUID into
2919  * the bitmap slot. Causes an incremental resync upon next connect.
2920  */
2921 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2922 {
2923         u64 val;
2924         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2925
2926         if (bm_uuid)
2927                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2928
2929         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2930
2931         get_random_bytes(&val, sizeof(u64));
2932         _drbd_uuid_set(mdev, UI_CURRENT, val);
2933         drbd_print_uuids(mdev, "new current UUID");
2934         /* get it to stable storage _now_ */
2935         drbd_md_sync(mdev);
2936 }
2937
2938 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2939 {
2940         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2941                 return;
2942
2943         if (val == 0) {
2944                 drbd_uuid_move_history(mdev);
2945                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2946                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2947         } else {
2948                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2949                 if (bm_uuid)
2950                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2951
2952                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2953         }
2954         drbd_md_mark_dirty(mdev);
2955 }
2956
2957 /**
2958  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2959  * @mdev:       DRBD device.
2960  *
2961  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2962  */
2963 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2964 {
2965         int rv = -EIO;
2966
2967         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2968                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2969                 drbd_md_sync(mdev);
2970                 drbd_bm_set_all(mdev);
2971
2972                 rv = drbd_bm_write(mdev);
2973
2974                 if (!rv) {
2975                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2976                         drbd_md_sync(mdev);
2977                 }
2978
2979                 put_ldev(mdev);
2980         }
2981
2982         return rv;
2983 }
2984
2985 /**
2986  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2987  * @mdev:       DRBD device.
2988  *
2989  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
2990  */
2991 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
2992 {
2993         int rv = -EIO;
2994
2995         drbd_resume_al(mdev);
2996         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2997                 drbd_bm_clear_all(mdev);
2998                 rv = drbd_bm_write(mdev);
2999                 put_ldev(mdev);
3000         }
3001
3002         return rv;
3003 }
3004
3005 static int w_bitmap_io(struct drbd_work *w, int unused)
3006 {
3007         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3008         struct drbd_conf *mdev = w->mdev;
3009         int rv = -EIO;
3010
3011         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3012
3013         if (get_ldev(mdev)) {
3014                 drbd_bm_lock(mdev, work->why, work->flags);
3015                 rv = work->io_fn(mdev);
3016                 drbd_bm_unlock(mdev);
3017                 put_ldev(mdev);
3018         }
3019
3020         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3021         wake_up(&mdev->misc_wait);
3022
3023         if (work->done)
3024                 work->done(mdev, rv);
3025
3026         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3027         work->why = NULL;
3028         work->flags = 0;
3029
3030         return 0;
3031 }
3032
3033 void drbd_ldev_destroy(struct drbd_conf *mdev)
3034 {
3035         lc_destroy(mdev->resync);
3036         mdev->resync = NULL;
3037         lc_destroy(mdev->act_log);
3038         mdev->act_log = NULL;
3039         __no_warn(local,
3040                 drbd_free_bc(mdev->ldev);
3041                 mdev->ldev = NULL;);
3042
3043         clear_bit(GO_DISKLESS, &mdev->flags);
3044 }
3045
3046 static int w_go_diskless(struct drbd_work *w, int unused)
3047 {
3048         struct drbd_conf *mdev = w->mdev;
3049
3050         D_ASSERT(mdev->state.disk == D_FAILED);
3051         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3052          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3053          * the protected members anymore, though, so once put_ldev reaches zero
3054          * again, it will be safe to free them. */
3055         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3056         return 0;
3057 }
3058
3059 void drbd_go_diskless(struct drbd_conf *mdev)
3060 {
3061         D_ASSERT(mdev->state.disk == D_FAILED);
3062         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3063                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3064 }
3065
3066 /**
3067  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3068  * @mdev:       DRBD device.
3069  * @io_fn:      IO callback to be called when bitmap IO is possible
3070  * @done:       callback to be called after the bitmap IO was performed
3071  * @why:        Descriptive text of the reason for doing the IO
3072  *
3073  * While IO on the bitmap happens we freeze application IO thus we ensure
3074  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3075  * called from worker context. It MUST NOT be used while a previous such
3076  * work is still pending!
3077  */
3078 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3079                           int (*io_fn)(struct drbd_conf *),
3080                           void (*done)(struct drbd_conf *, int),
3081                           char *why, enum bm_flag flags)
3082 {
3083         D_ASSERT(current == mdev->tconn->worker.task);
3084
3085         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3086         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3087         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3088         if (mdev->bm_io_work.why)
3089                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3090                         why, mdev->bm_io_work.why);
3091
3092         mdev->bm_io_work.io_fn = io_fn;
3093         mdev->bm_io_work.done = done;
3094         mdev->bm_io_work.why = why;
3095         mdev->bm_io_work.flags = flags;
3096
3097         spin_lock_irq(&mdev->tconn->req_lock);
3098         set_bit(BITMAP_IO, &mdev->flags);
3099         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3100                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3101                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3102         }
3103         spin_unlock_irq(&mdev->tconn->req_lock);
3104 }
3105
3106 /**
3107  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3108  * @mdev:       DRBD device.
3109  * @io_fn:      IO callback to be called when bitmap IO is possible
3110  * @why:        Descriptive text of the reason for doing the IO
3111  *
3112  * freezes application IO while that the actual IO operations runs. This
3113  * functions MAY NOT be called from worker context.
3114  */
3115 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3116                 char *why, enum bm_flag flags)
3117 {
3118         int rv;
3119
3120         D_ASSERT(current != mdev->tconn->worker.task);
3121
3122         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3123                 drbd_suspend_io(mdev);
3124
3125         drbd_bm_lock(mdev, why, flags);
3126         rv = io_fn(mdev);
3127         drbd_bm_unlock(mdev);
3128
3129         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3130                 drbd_resume_io(mdev);
3131
3132         return rv;
3133 }
3134
3135 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3136 {
3137         if ((mdev->ldev->md.flags & flag) != flag) {
3138                 drbd_md_mark_dirty(mdev);
3139                 mdev->ldev->md.flags |= flag;
3140         }
3141 }
3142
3143 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3144 {
3145         if ((mdev->ldev->md.flags & flag) != 0) {
3146                 drbd_md_mark_dirty(mdev);
3147                 mdev->ldev->md.flags &= ~flag;
3148         }
3149 }
3150 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3151 {
3152         return (bdev->md.flags & flag) != 0;
3153 }
3154
3155 static void md_sync_timer_fn(unsigned long data)
3156 {
3157         struct drbd_conf *mdev = (struct drbd_conf *) data;
3158
3159         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3160 }
3161
3162 static int w_md_sync(struct drbd_work *w, int unused)
3163 {
3164         struct drbd_conf *mdev = w->mdev;
3165
3166         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3167 #ifdef DEBUG
3168         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3169                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3170 #endif
3171         drbd_md_sync(mdev);
3172         return 0;
3173 }
3174
3175 const char *cmdname(enum drbd_packet cmd)
3176 {
3177         /* THINK may need to become several global tables
3178          * when we want to support more than
3179          * one PRO_VERSION */
3180         static const char *cmdnames[] = {
3181                 [P_DATA]                = "Data",
3182                 [P_DATA_REPLY]          = "DataReply",
3183                 [P_RS_DATA_REPLY]       = "RSDataReply",
3184                 [P_BARRIER]             = "Barrier",
3185                 [P_BITMAP]              = "ReportBitMap",
3186                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3187                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3188                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3189                 [P_DATA_REQUEST]        = "DataRequest",
3190                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3191                 [P_SYNC_PARAM]          = "SyncParam",
3192                 [P_SYNC_PARAM89]        = "SyncParam89",
3193                 [P_PROTOCOL]            = "ReportProtocol",
3194                 [P_UUIDS]               = "ReportUUIDs",
3195                 [P_SIZES]               = "ReportSizes",
3196                 [P_STATE]               = "ReportState",
3197                 [P_SYNC_UUID]           = "ReportSyncUUID",
3198                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3199                 [P_AUTH_RESPONSE]       = "AuthResponse",
3200                 [P_PING]                = "Ping",
3201                 [P_PING_ACK]            = "PingAck",
3202                 [P_RECV_ACK]            = "RecvAck",
3203                 [P_WRITE_ACK]           = "WriteAck",
3204                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3205                 [P_DISCARD_WRITE]        = "DiscardWrite",
3206                 [P_NEG_ACK]             = "NegAck",
3207                 [P_NEG_DREPLY]          = "NegDReply",
3208                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3209                 [P_BARRIER_ACK]         = "BarrierAck",
3210                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3211                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3212                 [P_OV_REQUEST]          = "OVRequest",
3213                 [P_OV_REPLY]            = "OVReply",
3214                 [P_OV_RESULT]           = "OVResult",
3215                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3216                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3217                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3218                 [P_DELAY_PROBE]         = "DelayProbe",
3219                 [P_OUT_OF_SYNC]         = "OutOfSync",
3220                 [P_RETRY_WRITE]         = "RetryWrite",
3221         };
3222
3223         if (cmd == P_INITIAL_META)
3224                 return "InitialMeta";
3225         if (cmd == P_INITIAL_DATA)
3226                 return "InitialData";
3227         if (cmd == P_CONNECTION_FEATURES)
3228                 return "ConnectionFeatures";
3229         if (cmd >= ARRAY_SIZE(cmdnames))
3230                 return "Unknown";
3231         return cmdnames[cmd];
3232 }
3233
3234 /**
3235  * drbd_wait_misc  -  wait for a request to make progress
3236  * @mdev:       device associated with the request
3237  * @i:          the struct drbd_interval embedded in struct drbd_request or
3238  *              struct drbd_peer_request
3239  */
3240 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3241 {
3242         struct net_conf *net_conf = mdev->tconn->net_conf;
3243         DEFINE_WAIT(wait);
3244         long timeout;
3245
3246         if (!net_conf)
3247                 return -ETIMEDOUT;
3248         timeout = MAX_SCHEDULE_TIMEOUT;
3249         if (net_conf->ko_count)
3250                 timeout = net_conf->timeout * HZ / 10 * net_conf->ko_count;
3251
3252         /* Indicate to wake up mdev->misc_wait on progress.  */
3253         i->waiting = true;
3254         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3255         spin_unlock_irq(&mdev->tconn->req_lock);
3256         timeout = schedule_timeout(timeout);
3257         finish_wait(&mdev->misc_wait, &wait);
3258         spin_lock_irq(&mdev->tconn->req_lock);
3259         if (!timeout || mdev->state.conn < C_CONNECTED)
3260                 return -ETIMEDOUT;
3261         if (signal_pending(current))
3262                 return -ERESTARTSYS;
3263         return 0;
3264 }
3265
3266 #ifdef CONFIG_DRBD_FAULT_INJECTION
3267 /* Fault insertion support including random number generator shamelessly
3268  * stolen from kernel/rcutorture.c */
3269 struct fault_random_state {
3270         unsigned long state;
3271         unsigned long count;
3272 };
3273
3274 #define FAULT_RANDOM_MULT 39916801  /* prime */
3275 #define FAULT_RANDOM_ADD        479001701 /* prime */
3276 #define FAULT_RANDOM_REFRESH 10000
3277
3278 /*
3279  * Crude but fast random-number generator.  Uses a linear congruential
3280  * generator, with occasional help from get_random_bytes().
3281  */
3282 static unsigned long
3283 _drbd_fault_random(struct fault_random_state *rsp)
3284 {
3285         long refresh;
3286
3287         if (!rsp->count--) {
3288                 get_random_bytes(&refresh, sizeof(refresh));
3289                 rsp->state += refresh;
3290                 rsp->count = FAULT_RANDOM_REFRESH;
3291         }
3292         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3293         return swahw32(rsp->state);
3294 }
3295
3296 static char *
3297 _drbd_fault_str(unsigned int type) {
3298         static char *_faults[] = {
3299                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3300                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3301                 [DRBD_FAULT_RS_WR] = "Resync write",
3302                 [DRBD_FAULT_RS_RD] = "Resync read",
3303                 [DRBD_FAULT_DT_WR] = "Data write",
3304                 [DRBD_FAULT_DT_RD] = "Data read",
3305                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3306                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3307                 [DRBD_FAULT_AL_EE] = "EE allocation",
3308                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3309         };
3310
3311         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3312 }
3313
3314 unsigned int
3315 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3316 {
3317         static struct fault_random_state rrs = {0, 0};
3318
3319         unsigned int ret = (
3320                 (fault_devs == 0 ||
3321                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3322                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3323
3324         if (ret) {
3325                 fault_count++;
3326
3327                 if (__ratelimit(&drbd_ratelimit_state))
3328                         dev_warn(DEV, "***Simulating %s failure\n",
3329                                 _drbd_fault_str(type));
3330         }
3331
3332         return ret;
3333 }
3334 #endif
3335
3336 const char *drbd_buildtag(void)
3337 {
3338         /* DRBD built from external sources has here a reference to the
3339            git hash of the source code. */
3340
3341         static char buildtag[38] = "\0uilt-in";
3342
3343         if (buildtag[0] == 0) {
3344 #ifdef CONFIG_MODULES
3345                 if (THIS_MODULE != NULL)
3346                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3347                 else
3348 #endif
3349                         buildtag[0] = 'b';
3350         }
3351
3352         return buildtag;
3353 }
3354
3355 module_init(drbd_init)
3356 module_exit(drbd_cleanup)
3357
3358 EXPORT_SYMBOL(drbd_conn_str);
3359 EXPORT_SYMBOL(drbd_role_str);
3360 EXPORT_SYMBOL(drbd_disk_str);
3361 EXPORT_SYMBOL(drbd_set_st_err_str);