drbd: rcu_read_[un]lock() for all idr accesses that do not sleep
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123 DEFINE_MUTEX(drbd_cfg_mutex);
124
125 struct kmem_cache *drbd_request_cache;
126 struct kmem_cache *drbd_ee_cache;       /* peer requests */
127 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
128 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
129 mempool_t *drbd_request_mempool;
130 mempool_t *drbd_ee_mempool;
131 mempool_t *drbd_md_io_page_pool;
132 struct bio_set *drbd_md_io_bio_set;
133
134 /* I do not use a standard mempool, because:
135    1) I want to hand out the pre-allocated objects first.
136    2) I want to be able to interrupt sleeping allocation with a signal.
137    Note: This is a single linked list, the next pointer is the private
138          member of struct page.
139  */
140 struct page *drbd_pp_pool;
141 spinlock_t   drbd_pp_lock;
142 int          drbd_pp_vacant;
143 wait_queue_head_t drbd_pp_wait;
144
145 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
146
147 static const struct block_device_operations drbd_ops = {
148         .owner =   THIS_MODULE,
149         .open =    drbd_open,
150         .release = drbd_release,
151 };
152
153 static void bio_destructor_drbd(struct bio *bio)
154 {
155         bio_free(bio, drbd_md_io_bio_set);
156 }
157
158 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
159 {
160         struct bio *bio;
161
162         if (!drbd_md_io_bio_set)
163                 return bio_alloc(gfp_mask, 1);
164
165         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
166         if (!bio)
167                 return NULL;
168         bio->bi_destructor = bio_destructor_drbd;
169         return bio;
170 }
171
172 #ifdef __CHECKER__
173 /* When checking with sparse, and this is an inline function, sparse will
174    give tons of false positives. When this is a real functions sparse works.
175  */
176 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
177 {
178         int io_allowed;
179
180         atomic_inc(&mdev->local_cnt);
181         io_allowed = (mdev->state.disk >= mins);
182         if (!io_allowed) {
183                 if (atomic_dec_and_test(&mdev->local_cnt))
184                         wake_up(&mdev->misc_wait);
185         }
186         return io_allowed;
187 }
188
189 #endif
190
191 /**
192  * DOC: The transfer log
193  *
194  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
195  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
196  * of the list. There is always at least one &struct drbd_tl_epoch object.
197  *
198  * Each &struct drbd_tl_epoch has a circular double linked list of requests
199  * attached.
200  */
201 static int tl_init(struct drbd_tconn *tconn)
202 {
203         struct drbd_tl_epoch *b;
204
205         /* during device minor initialization, we may well use GFP_KERNEL */
206         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
207         if (!b)
208                 return 0;
209         INIT_LIST_HEAD(&b->requests);
210         INIT_LIST_HEAD(&b->w.list);
211         b->next = NULL;
212         b->br_number = 4711;
213         b->n_writes = 0;
214         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
215
216         tconn->oldest_tle = b;
217         tconn->newest_tle = b;
218         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
219
220         return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225         if (tconn->oldest_tle != tconn->newest_tle)
226                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227         if (!list_empty(&tconn->out_of_sequence_requests))
228                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229         kfree(tconn->oldest_tle);
230         tconn->oldest_tle = NULL;
231         kfree(tconn->unused_spare_tle);
232         tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236  * _tl_add_barrier() - Adds a barrier to the transfer log
237  * @mdev:       DRBD device.
238  * @new:        Barrier to be added before the current head of the TL.
239  *
240  * The caller must hold the req_lock.
241  */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244         struct drbd_tl_epoch *newest_before;
245
246         INIT_LIST_HEAD(&new->requests);
247         INIT_LIST_HEAD(&new->w.list);
248         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249         new->next = NULL;
250         new->n_writes = 0;
251
252         newest_before = tconn->newest_tle;
253         /* never send a barrier number == 0, because that is special-cased
254          * when using TCQ for our write ordering code */
255         new->br_number = (newest_before->br_number+1) ?: 1;
256         if (tconn->newest_tle != new) {
257                 tconn->newest_tle->next = new;
258                 tconn->newest_tle = new;
259         }
260 }
261
262 /**
263  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264  * @mdev:       DRBD device.
265  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266  * @set_size:   Expected number of requests before that barrier.
267  *
268  * In case the passed barrier_nr or set_size does not match the oldest
269  * &struct drbd_tl_epoch objects this function will cause a termination
270  * of the connection.
271  */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273                 unsigned int set_size)
274 {
275         struct drbd_conf *mdev;
276         struct drbd_tl_epoch *b, *nob; /* next old barrier */
277         struct list_head *le, *tle;
278         struct drbd_request *r;
279
280         spin_lock_irq(&tconn->req_lock);
281
282         b = tconn->oldest_tle;
283
284         /* first some paranoia code */
285         if (b == NULL) {
286                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287                          barrier_nr);
288                 goto bail;
289         }
290         if (b->br_number != barrier_nr) {
291                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292                          barrier_nr, b->br_number);
293                 goto bail;
294         }
295         if (b->n_writes != set_size) {
296                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297                          barrier_nr, set_size, b->n_writes);
298                 goto bail;
299         }
300
301         /* Clean up list of requests processed during current epoch */
302         list_for_each_safe(le, tle, &b->requests) {
303                 r = list_entry(le, struct drbd_request, tl_requests);
304                 _req_mod(r, BARRIER_ACKED);
305         }
306         /* There could be requests on the list waiting for completion
307            of the write to the local disk. To avoid corruptions of
308            slab's data structures we have to remove the lists head.
309
310            Also there could have been a barrier ack out of sequence, overtaking
311            the write acks - which would be a bug and violating write ordering.
312            To not deadlock in case we lose connection while such requests are
313            still pending, we need some way to find them for the
314            _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316            These have been list_move'd to the out_of_sequence_requests list in
317            _req_mod(, BARRIER_ACKED) above.
318            */
319         list_del_init(&b->requests);
320         mdev = b->w.mdev;
321
322         nob = b->next;
323         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324                 _tl_add_barrier(tconn, b);
325                 if (nob)
326                         tconn->oldest_tle = nob;
327                 /* if nob == NULL b was the only barrier, and becomes the new
328                    barrier. Therefore tconn->oldest_tle points already to b */
329         } else {
330                 D_ASSERT(nob != NULL);
331                 tconn->oldest_tle = nob;
332                 kfree(b);
333         }
334
335         spin_unlock_irq(&tconn->req_lock);
336         dec_ap_pending(mdev);
337
338         return;
339
340 bail:
341         spin_unlock_irq(&tconn->req_lock);
342         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347  * _tl_restart() - Walks the transfer log, and applies an action to all requests
348  * @mdev:       DRBD device.
349  * @what:       The action/event to perform with all request objects
350  *
351  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352  * RESTART_FROZEN_DISK_IO.
353  */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356         struct drbd_tl_epoch *b, *tmp, **pn;
357         struct list_head *le, *tle, carry_reads;
358         struct drbd_request *req;
359         int rv, n_writes, n_reads;
360
361         b = tconn->oldest_tle;
362         pn = &tconn->oldest_tle;
363         while (b) {
364                 n_writes = 0;
365                 n_reads = 0;
366                 INIT_LIST_HEAD(&carry_reads);
367                 list_for_each_safe(le, tle, &b->requests) {
368                         req = list_entry(le, struct drbd_request, tl_requests);
369                         rv = _req_mod(req, what);
370
371                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
373                 }
374                 tmp = b->next;
375
376                 if (n_writes) {
377                         if (what == RESEND) {
378                                 b->n_writes = n_writes;
379                                 if (b->w.cb == NULL) {
380                                         b->w.cb = w_send_barrier;
381                                         inc_ap_pending(b->w.mdev);
382                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
383                                 }
384
385                                 drbd_queue_work(&tconn->data.work, &b->w);
386                         }
387                         pn = &b->next;
388                 } else {
389                         if (n_reads)
390                                 list_add(&carry_reads, &b->requests);
391                         /* there could still be requests on that ring list,
392                          * in case local io is still pending */
393                         list_del(&b->requests);
394
395                         /* dec_ap_pending corresponding to queue_barrier.
396                          * the newest barrier may not have been queued yet,
397                          * in which case w.cb is still NULL. */
398                         if (b->w.cb != NULL)
399                                 dec_ap_pending(b->w.mdev);
400
401                         if (b == tconn->newest_tle) {
402                                 /* recycle, but reinit! */
403                                 if (tmp != NULL)
404                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
405                                 INIT_LIST_HEAD(&b->requests);
406                                 list_splice(&carry_reads, &b->requests);
407                                 INIT_LIST_HEAD(&b->w.list);
408                                 b->w.cb = NULL;
409                                 b->br_number = net_random();
410                                 b->n_writes = 0;
411
412                                 *pn = b;
413                                 break;
414                         }
415                         *pn = tmp;
416                         kfree(b);
417                 }
418                 b = tmp;
419                 list_splice(&carry_reads, &b->requests);
420         }
421 }
422
423
424 /**
425  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
426  * @mdev:       DRBD device.
427  *
428  * This is called after the connection to the peer was lost. The storage covered
429  * by the requests on the transfer gets marked as our of sync. Called from the
430  * receiver thread and the worker thread.
431  */
432 void tl_clear(struct drbd_tconn *tconn)
433 {
434         struct drbd_conf *mdev;
435         struct list_head *le, *tle;
436         struct drbd_request *r;
437         int vnr;
438
439         spin_lock_irq(&tconn->req_lock);
440
441         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
442
443         /* we expect this list to be empty. */
444         if (!list_empty(&tconn->out_of_sequence_requests))
445                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
446
447         /* but just in case, clean it up anyways! */
448         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
449                 r = list_entry(le, struct drbd_request, tl_requests);
450                 /* It would be nice to complete outside of spinlock.
451                  * But this is easier for now. */
452                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
453         }
454
455         /* ensure bit indicating barrier is required is clear */
456         rcu_read_lock();
457         idr_for_each_entry(&tconn->volumes, mdev, vnr)
458                 clear_bit(CREATE_BARRIER, &mdev->flags);
459         rcu_read_unlock();
460
461         spin_unlock_irq(&tconn->req_lock);
462 }
463
464 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
465 {
466         spin_lock_irq(&tconn->req_lock);
467         _tl_restart(tconn, what);
468         spin_unlock_irq(&tconn->req_lock);
469 }
470
471 static int drbd_thread_setup(void *arg)
472 {
473         struct drbd_thread *thi = (struct drbd_thread *) arg;
474         struct drbd_tconn *tconn = thi->tconn;
475         unsigned long flags;
476         int retval;
477
478         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
479                  thi->name[0], thi->tconn->name);
480
481 restart:
482         retval = thi->function(thi);
483
484         spin_lock_irqsave(&thi->t_lock, flags);
485
486         /* if the receiver has been "EXITING", the last thing it did
487          * was set the conn state to "StandAlone",
488          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
489          * and receiver thread will be "started".
490          * drbd_thread_start needs to set "RESTARTING" in that case.
491          * t_state check and assignment needs to be within the same spinlock,
492          * so either thread_start sees EXITING, and can remap to RESTARTING,
493          * or thread_start see NONE, and can proceed as normal.
494          */
495
496         if (thi->t_state == RESTARTING) {
497                 conn_info(tconn, "Restarting %s thread\n", thi->name);
498                 thi->t_state = RUNNING;
499                 spin_unlock_irqrestore(&thi->t_lock, flags);
500                 goto restart;
501         }
502
503         thi->task = NULL;
504         thi->t_state = NONE;
505         smp_mb();
506         complete(&thi->stop);
507         spin_unlock_irqrestore(&thi->t_lock, flags);
508
509         conn_info(tconn, "Terminating %s\n", current->comm);
510
511         /* Release mod reference taken when thread was started */
512         module_put(THIS_MODULE);
513         return retval;
514 }
515
516 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
517                              int (*func) (struct drbd_thread *), char *name)
518 {
519         spin_lock_init(&thi->t_lock);
520         thi->task    = NULL;
521         thi->t_state = NONE;
522         thi->function = func;
523         thi->tconn = tconn;
524         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
525 }
526
527 int drbd_thread_start(struct drbd_thread *thi)
528 {
529         struct drbd_tconn *tconn = thi->tconn;
530         struct task_struct *nt;
531         unsigned long flags;
532
533         /* is used from state engine doing drbd_thread_stop_nowait,
534          * while holding the req lock irqsave */
535         spin_lock_irqsave(&thi->t_lock, flags);
536
537         switch (thi->t_state) {
538         case NONE:
539                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
540                          thi->name, current->comm, current->pid);
541
542                 /* Get ref on module for thread - this is released when thread exits */
543                 if (!try_module_get(THIS_MODULE)) {
544                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
545                         spin_unlock_irqrestore(&thi->t_lock, flags);
546                         return false;
547                 }
548
549                 init_completion(&thi->stop);
550                 thi->reset_cpu_mask = 1;
551                 thi->t_state = RUNNING;
552                 spin_unlock_irqrestore(&thi->t_lock, flags);
553                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
554
555                 nt = kthread_create(drbd_thread_setup, (void *) thi,
556                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
557
558                 if (IS_ERR(nt)) {
559                         conn_err(tconn, "Couldn't start thread\n");
560
561                         module_put(THIS_MODULE);
562                         return false;
563                 }
564                 spin_lock_irqsave(&thi->t_lock, flags);
565                 thi->task = nt;
566                 thi->t_state = RUNNING;
567                 spin_unlock_irqrestore(&thi->t_lock, flags);
568                 wake_up_process(nt);
569                 break;
570         case EXITING:
571                 thi->t_state = RESTARTING;
572                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
573                                 thi->name, current->comm, current->pid);
574                 /* fall through */
575         case RUNNING:
576         case RESTARTING:
577         default:
578                 spin_unlock_irqrestore(&thi->t_lock, flags);
579                 break;
580         }
581
582         return true;
583 }
584
585
586 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
587 {
588         unsigned long flags;
589
590         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
591
592         /* may be called from state engine, holding the req lock irqsave */
593         spin_lock_irqsave(&thi->t_lock, flags);
594
595         if (thi->t_state == NONE) {
596                 spin_unlock_irqrestore(&thi->t_lock, flags);
597                 if (restart)
598                         drbd_thread_start(thi);
599                 return;
600         }
601
602         if (thi->t_state != ns) {
603                 if (thi->task == NULL) {
604                         spin_unlock_irqrestore(&thi->t_lock, flags);
605                         return;
606                 }
607
608                 thi->t_state = ns;
609                 smp_mb();
610                 init_completion(&thi->stop);
611                 if (thi->task != current)
612                         force_sig(DRBD_SIGKILL, thi->task);
613         }
614
615         spin_unlock_irqrestore(&thi->t_lock, flags);
616
617         if (wait)
618                 wait_for_completion(&thi->stop);
619 }
620
621 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
622 {
623         struct drbd_thread *thi =
624                 task == tconn->receiver.task ? &tconn->receiver :
625                 task == tconn->asender.task  ? &tconn->asender :
626                 task == tconn->worker.task   ? &tconn->worker : NULL;
627
628         return thi;
629 }
630
631 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
632 {
633         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
634         return thi ? thi->name : task->comm;
635 }
636
637 int conn_lowest_minor(struct drbd_tconn *tconn)
638 {
639         struct drbd_conf *mdev;
640         int vnr = 0, m;
641
642         rcu_read_lock();
643         mdev = idr_get_next(&tconn->volumes, &vnr);
644         m = mdev ? mdev_to_minor(mdev) : -1;
645         rcu_read_unlock();
646
647         return m;
648 }
649
650 #ifdef CONFIG_SMP
651 /**
652  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
653  * @mdev:       DRBD device.
654  *
655  * Forces all threads of a device onto the same CPU. This is beneficial for
656  * DRBD's performance. May be overwritten by user's configuration.
657  */
658 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
659 {
660         int ord, cpu;
661
662         /* user override. */
663         if (cpumask_weight(tconn->cpu_mask))
664                 return;
665
666         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
667         for_each_online_cpu(cpu) {
668                 if (ord-- == 0) {
669                         cpumask_set_cpu(cpu, tconn->cpu_mask);
670                         return;
671                 }
672         }
673         /* should not be reached */
674         cpumask_setall(tconn->cpu_mask);
675 }
676
677 /**
678  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
679  * @mdev:       DRBD device.
680  * @thi:        drbd_thread object
681  *
682  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
683  * prematurely.
684  */
685 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
686 {
687         struct task_struct *p = current;
688
689         if (!thi->reset_cpu_mask)
690                 return;
691         thi->reset_cpu_mask = 0;
692         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
693 }
694 #endif
695
696 /**
697  * drbd_header_size  -  size of a packet header
698  *
699  * The header size is a multiple of 8, so any payload following the header is
700  * word aligned on 64-bit architectures.  (The bitmap send and receive code
701  * relies on this.)
702  */
703 unsigned int drbd_header_size(struct drbd_tconn *tconn)
704 {
705         if (tconn->agreed_pro_version >= 100) {
706                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
707                 return sizeof(struct p_header100);
708         } else {
709                 BUILD_BUG_ON(sizeof(struct p_header80) !=
710                              sizeof(struct p_header95));
711                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
712                 return sizeof(struct p_header80);
713         }
714 }
715
716 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
717 {
718         h->magic   = cpu_to_be32(DRBD_MAGIC);
719         h->command = cpu_to_be16(cmd);
720         h->length  = cpu_to_be16(size);
721         return sizeof(struct p_header80);
722 }
723
724 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
725 {
726         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
727         h->command = cpu_to_be16(cmd);
728         h->length = cpu_to_be32(size);
729         return sizeof(struct p_header95);
730 }
731
732 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
733                                       int size, int vnr)
734 {
735         h->magic = cpu_to_be32(DRBD_MAGIC_100);
736         h->volume = cpu_to_be16(vnr);
737         h->command = cpu_to_be16(cmd);
738         h->length = cpu_to_be32(size);
739         h->pad = 0;
740         return sizeof(struct p_header100);
741 }
742
743 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
744                                    void *buffer, enum drbd_packet cmd, int size)
745 {
746         if (tconn->agreed_pro_version >= 100)
747                 return prepare_header100(buffer, cmd, size, vnr);
748         else if (tconn->agreed_pro_version >= 95 &&
749                  size > DRBD_MAX_SIZE_H80_PACKET)
750                 return prepare_header95(buffer, cmd, size);
751         else
752                 return prepare_header80(buffer, cmd, size);
753 }
754
755 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
756 {
757         mutex_lock(&sock->mutex);
758         if (!sock->socket) {
759                 mutex_unlock(&sock->mutex);
760                 return NULL;
761         }
762         return sock->sbuf + drbd_header_size(tconn);
763 }
764
765 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
766 {
767         return conn_prepare_command(mdev->tconn, sock);
768 }
769
770 static int __send_command(struct drbd_tconn *tconn, int vnr,
771                           struct drbd_socket *sock, enum drbd_packet cmd,
772                           unsigned int header_size, void *data,
773                           unsigned int size)
774 {
775         int msg_flags;
776         int err;
777
778         /*
779          * Called with @data == NULL and the size of the data blocks in @size
780          * for commands that send data blocks.  For those commands, omit the
781          * MSG_MORE flag: this will increase the likelihood that data blocks
782          * which are page aligned on the sender will end up page aligned on the
783          * receiver.
784          */
785         msg_flags = data ? MSG_MORE : 0;
786
787         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
788                                       header_size + size);
789         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
790                             msg_flags);
791         if (data && !err)
792                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
793         return err;
794 }
795
796 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
797                       enum drbd_packet cmd, unsigned int header_size,
798                       void *data, unsigned int size)
799 {
800         int err;
801
802         err = __send_command(tconn, 0, sock, cmd, header_size, data, size);
803         mutex_unlock(&sock->mutex);
804         return err;
805 }
806
807 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
808                       enum drbd_packet cmd, unsigned int header_size,
809                       void *data, unsigned int size)
810 {
811         int err;
812
813         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
814                              data, size);
815         mutex_unlock(&sock->mutex);
816         return err;
817 }
818
819 int drbd_send_ping(struct drbd_tconn *tconn)
820 {
821         struct drbd_socket *sock;
822
823         sock = &tconn->meta;
824         if (!conn_prepare_command(tconn, sock))
825                 return -EIO;
826         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
827 }
828
829 int drbd_send_ping_ack(struct drbd_tconn *tconn)
830 {
831         struct drbd_socket *sock;
832
833         sock = &tconn->meta;
834         if (!conn_prepare_command(tconn, sock))
835                 return -EIO;
836         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
837 }
838
839 int drbd_send_sync_param(struct drbd_conf *mdev)
840 {
841         struct drbd_socket *sock;
842         struct p_rs_param_95 *p;
843         int size;
844         const int apv = mdev->tconn->agreed_pro_version;
845         enum drbd_packet cmd;
846
847         sock = &mdev->tconn->data;
848         p = drbd_prepare_command(mdev, sock);
849         if (!p)
850                 return -EIO;
851
852         size = apv <= 87 ? sizeof(struct p_rs_param)
853                 : apv == 88 ? sizeof(struct p_rs_param)
854                         + strlen(mdev->tconn->net_conf->verify_alg) + 1
855                 : apv <= 94 ? sizeof(struct p_rs_param_89)
856                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
857
858         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
859
860         /* initialize verify_alg and csums_alg */
861         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
862
863         if (get_ldev(mdev)) {
864                 p->rate = cpu_to_be32(mdev->ldev->dc.resync_rate);
865                 p->c_plan_ahead = cpu_to_be32(mdev->ldev->dc.c_plan_ahead);
866                 p->c_delay_target = cpu_to_be32(mdev->ldev->dc.c_delay_target);
867                 p->c_fill_target = cpu_to_be32(mdev->ldev->dc.c_fill_target);
868                 p->c_max_rate = cpu_to_be32(mdev->ldev->dc.c_max_rate);
869                 put_ldev(mdev);
870         } else {
871                 p->rate = cpu_to_be32(DRBD_RATE_DEF);
872                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
873                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
874                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
875                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
876         }
877
878         if (apv >= 88)
879                 strcpy(p->verify_alg, mdev->tconn->net_conf->verify_alg);
880         if (apv >= 89)
881                 strcpy(p->csums_alg, mdev->tconn->net_conf->csums_alg);
882
883         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
884 }
885
886 int drbd_send_protocol(struct drbd_tconn *tconn)
887 {
888         struct drbd_socket *sock;
889         struct p_protocol *p;
890         int size, cf;
891
892         if (tconn->net_conf->dry_run && tconn->agreed_pro_version < 92) {
893                 conn_err(tconn, "--dry-run is not supported by peer");
894                 return -EOPNOTSUPP;
895         }
896
897         sock = &tconn->data;
898         p = conn_prepare_command(tconn, sock);
899         if (!p)
900                 return -EIO;
901
902         size = sizeof(*p);
903         if (tconn->agreed_pro_version >= 87)
904                 size += strlen(tconn->net_conf->integrity_alg) + 1;
905
906         p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
907         p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
908         p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
909         p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
910         p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
911         cf = 0;
912         if (tconn->net_conf->want_lose)
913                 cf |= CF_WANT_LOSE;
914         if (tconn->net_conf->dry_run)
915                 cf |= CF_DRY_RUN;
916         p->conn_flags    = cpu_to_be32(cf);
917
918         if (tconn->agreed_pro_version >= 87)
919                 strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
920         return conn_send_command(tconn, sock, P_PROTOCOL, size, NULL, 0);
921 }
922
923 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
924 {
925         struct drbd_socket *sock;
926         struct p_uuids *p;
927         int i;
928
929         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
930                 return 0;
931
932         sock = &mdev->tconn->data;
933         p = drbd_prepare_command(mdev, sock);
934         if (!p) {
935                 put_ldev(mdev);
936                 return -EIO;
937         }
938         for (i = UI_CURRENT; i < UI_SIZE; i++)
939                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
940
941         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
942         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
943         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
944         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
945         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
946         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
947
948         put_ldev(mdev);
949         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
950 }
951
952 int drbd_send_uuids(struct drbd_conf *mdev)
953 {
954         return _drbd_send_uuids(mdev, 0);
955 }
956
957 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
958 {
959         return _drbd_send_uuids(mdev, 8);
960 }
961
962 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
963 {
964         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
965                 u64 *uuid = mdev->ldev->md.uuid;
966                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
967                      text,
968                      (unsigned long long)uuid[UI_CURRENT],
969                      (unsigned long long)uuid[UI_BITMAP],
970                      (unsigned long long)uuid[UI_HISTORY_START],
971                      (unsigned long long)uuid[UI_HISTORY_END]);
972                 put_ldev(mdev);
973         } else {
974                 dev_info(DEV, "%s effective data uuid: %016llX\n",
975                                 text,
976                                 (unsigned long long)mdev->ed_uuid);
977         }
978 }
979
980 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
981 {
982         struct drbd_socket *sock;
983         struct p_rs_uuid *p;
984         u64 uuid;
985
986         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
987
988         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
989         drbd_uuid_set(mdev, UI_BITMAP, uuid);
990         drbd_print_uuids(mdev, "updated sync UUID");
991         drbd_md_sync(mdev);
992
993         sock = &mdev->tconn->data;
994         p = drbd_prepare_command(mdev, sock);
995         if (p) {
996                 p->uuid = cpu_to_be64(uuid);
997                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
998         }
999 }
1000
1001 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1002 {
1003         struct drbd_socket *sock;
1004         struct p_sizes *p;
1005         sector_t d_size, u_size;
1006         int q_order_type, max_bio_size;
1007
1008         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1009                 D_ASSERT(mdev->ldev->backing_bdev);
1010                 d_size = drbd_get_max_capacity(mdev->ldev);
1011                 u_size = mdev->ldev->dc.disk_size;
1012                 q_order_type = drbd_queue_order_type(mdev);
1013                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1014                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1015                 put_ldev(mdev);
1016         } else {
1017                 d_size = 0;
1018                 u_size = 0;
1019                 q_order_type = QUEUE_ORDERED_NONE;
1020                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1021         }
1022
1023         sock = &mdev->tconn->data;
1024         p = drbd_prepare_command(mdev, sock);
1025         if (!p)
1026                 return -EIO;
1027         p->d_size = cpu_to_be64(d_size);
1028         p->u_size = cpu_to_be64(u_size);
1029         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1030         p->max_bio_size = cpu_to_be32(max_bio_size);
1031         p->queue_order_type = cpu_to_be16(q_order_type);
1032         p->dds_flags = cpu_to_be16(flags);
1033         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1034 }
1035
1036 /**
1037  * drbd_send_state() - Sends the drbd state to the peer
1038  * @mdev:       DRBD device.
1039  */
1040 int drbd_send_state(struct drbd_conf *mdev)
1041 {
1042         struct drbd_socket *sock;
1043         struct p_state *p;
1044
1045         sock = &mdev->tconn->data;
1046         p = drbd_prepare_command(mdev, sock);
1047         if (!p)
1048                 return -EIO;
1049         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1050         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1051 }
1052
1053 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1054 {
1055         struct drbd_socket *sock;
1056         struct p_req_state *p;
1057
1058         sock = &mdev->tconn->data;
1059         p = drbd_prepare_command(mdev, sock);
1060         if (!p)
1061                 return -EIO;
1062         p->mask = cpu_to_be32(mask.i);
1063         p->val = cpu_to_be32(val.i);
1064         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1065
1066 }
1067
1068 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1069 {
1070         enum drbd_packet cmd;
1071         struct drbd_socket *sock;
1072         struct p_req_state *p;
1073
1074         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1075         sock = &tconn->data;
1076         p = conn_prepare_command(tconn, sock);
1077         if (!p)
1078                 return -EIO;
1079         p->mask = cpu_to_be32(mask.i);
1080         p->val = cpu_to_be32(val.i);
1081         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1082 }
1083
1084 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1085 {
1086         struct drbd_socket *sock;
1087         struct p_req_state_reply *p;
1088
1089         sock = &mdev->tconn->meta;
1090         p = drbd_prepare_command(mdev, sock);
1091         if (p) {
1092                 p->retcode = cpu_to_be32(retcode);
1093                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1094         }
1095 }
1096
1097 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1098 {
1099         struct drbd_socket *sock;
1100         struct p_req_state_reply *p;
1101         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1102
1103         sock = &tconn->meta;
1104         p = conn_prepare_command(tconn, sock);
1105         if (p) {
1106                 p->retcode = cpu_to_be32(retcode);
1107                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1108         }
1109 }
1110
1111 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1112 {
1113         BUG_ON(code & ~0xf);
1114         p->encoding = (p->encoding & ~0xf) | code;
1115 }
1116
1117 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1118 {
1119         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1120 }
1121
1122 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1123 {
1124         BUG_ON(n & ~0x7);
1125         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1126 }
1127
1128 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1129                          struct p_compressed_bm *p,
1130                          unsigned int size,
1131                          struct bm_xfer_ctx *c)
1132 {
1133         struct bitstream bs;
1134         unsigned long plain_bits;
1135         unsigned long tmp;
1136         unsigned long rl;
1137         unsigned len;
1138         unsigned toggle;
1139         int bits;
1140
1141         /* may we use this feature? */
1142         if ((mdev->tconn->net_conf->use_rle == 0) ||
1143                 (mdev->tconn->agreed_pro_version < 90))
1144                         return 0;
1145
1146         if (c->bit_offset >= c->bm_bits)
1147                 return 0; /* nothing to do. */
1148
1149         /* use at most thus many bytes */
1150         bitstream_init(&bs, p->code, size, 0);
1151         memset(p->code, 0, size);
1152         /* plain bits covered in this code string */
1153         plain_bits = 0;
1154
1155         /* p->encoding & 0x80 stores whether the first run length is set.
1156          * bit offset is implicit.
1157          * start with toggle == 2 to be able to tell the first iteration */
1158         toggle = 2;
1159
1160         /* see how much plain bits we can stuff into one packet
1161          * using RLE and VLI. */
1162         do {
1163                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1164                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1165                 if (tmp == -1UL)
1166                         tmp = c->bm_bits;
1167                 rl = tmp - c->bit_offset;
1168
1169                 if (toggle == 2) { /* first iteration */
1170                         if (rl == 0) {
1171                                 /* the first checked bit was set,
1172                                  * store start value, */
1173                                 dcbp_set_start(p, 1);
1174                                 /* but skip encoding of zero run length */
1175                                 toggle = !toggle;
1176                                 continue;
1177                         }
1178                         dcbp_set_start(p, 0);
1179                 }
1180
1181                 /* paranoia: catch zero runlength.
1182                  * can only happen if bitmap is modified while we scan it. */
1183                 if (rl == 0) {
1184                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1185                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1186                         return -1;
1187                 }
1188
1189                 bits = vli_encode_bits(&bs, rl);
1190                 if (bits == -ENOBUFS) /* buffer full */
1191                         break;
1192                 if (bits <= 0) {
1193                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1194                         return 0;
1195                 }
1196
1197                 toggle = !toggle;
1198                 plain_bits += rl;
1199                 c->bit_offset = tmp;
1200         } while (c->bit_offset < c->bm_bits);
1201
1202         len = bs.cur.b - p->code + !!bs.cur.bit;
1203
1204         if (plain_bits < (len << 3)) {
1205                 /* incompressible with this method.
1206                  * we need to rewind both word and bit position. */
1207                 c->bit_offset -= plain_bits;
1208                 bm_xfer_ctx_bit_to_word_offset(c);
1209                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1210                 return 0;
1211         }
1212
1213         /* RLE + VLI was able to compress it just fine.
1214          * update c->word_offset. */
1215         bm_xfer_ctx_bit_to_word_offset(c);
1216
1217         /* store pad_bits */
1218         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1219
1220         return len;
1221 }
1222
1223 /**
1224  * send_bitmap_rle_or_plain
1225  *
1226  * Return 0 when done, 1 when another iteration is needed, and a negative error
1227  * code upon failure.
1228  */
1229 static int
1230 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1231 {
1232         struct drbd_socket *sock = &mdev->tconn->data;
1233         unsigned int header_size = drbd_header_size(mdev->tconn);
1234         struct p_compressed_bm *p = sock->sbuf + header_size;
1235         int len, err;
1236
1237         len = fill_bitmap_rle_bits(mdev, p,
1238                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1239         if (len < 0)
1240                 return -EIO;
1241
1242         if (len) {
1243                 dcbp_set_code(p, RLE_VLI_Bits);
1244                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1245                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1246                                      NULL, 0);
1247                 c->packets[0]++;
1248                 c->bytes[0] += header_size + sizeof(*p) + len;
1249
1250                 if (c->bit_offset >= c->bm_bits)
1251                         len = 0; /* DONE */
1252         } else {
1253                 /* was not compressible.
1254                  * send a buffer full of plain text bits instead. */
1255                 unsigned int data_size;
1256                 unsigned long num_words;
1257                 unsigned long *p = sock->sbuf + header_size;
1258
1259                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1260                 num_words = min_t(size_t, data_size / sizeof(*p),
1261                                   c->bm_words - c->word_offset);
1262                 len = num_words * sizeof(*p);
1263                 if (len)
1264                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1265                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1266                 c->word_offset += num_words;
1267                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1268
1269                 c->packets[1]++;
1270                 c->bytes[1] += header_size + len;
1271
1272                 if (c->bit_offset > c->bm_bits)
1273                         c->bit_offset = c->bm_bits;
1274         }
1275         if (!err) {
1276                 if (len == 0) {
1277                         INFO_bm_xfer_stats(mdev, "send", c);
1278                         return 0;
1279                 } else
1280                         return 1;
1281         }
1282         return -EIO;
1283 }
1284
1285 /* See the comment at receive_bitmap() */
1286 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1287 {
1288         struct bm_xfer_ctx c;
1289         int err;
1290
1291         if (!expect(mdev->bitmap))
1292                 return false;
1293
1294         if (get_ldev(mdev)) {
1295                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1296                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1297                         drbd_bm_set_all(mdev);
1298                         if (drbd_bm_write(mdev)) {
1299                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1300                                  * but otherwise process as per normal - need to tell other
1301                                  * side that a full resync is required! */
1302                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1303                         } else {
1304                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1305                                 drbd_md_sync(mdev);
1306                         }
1307                 }
1308                 put_ldev(mdev);
1309         }
1310
1311         c = (struct bm_xfer_ctx) {
1312                 .bm_bits = drbd_bm_bits(mdev),
1313                 .bm_words = drbd_bm_words(mdev),
1314         };
1315
1316         do {
1317                 err = send_bitmap_rle_or_plain(mdev, &c);
1318         } while (err > 0);
1319
1320         return err == 0;
1321 }
1322
1323 int drbd_send_bitmap(struct drbd_conf *mdev)
1324 {
1325         struct drbd_socket *sock = &mdev->tconn->data;
1326         int err = -1;
1327
1328         mutex_lock(&sock->mutex);
1329         if (sock->socket)
1330                 err = !_drbd_send_bitmap(mdev);
1331         mutex_unlock(&sock->mutex);
1332         return err;
1333 }
1334
1335 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1336 {
1337         struct drbd_socket *sock;
1338         struct p_barrier_ack *p;
1339
1340         if (mdev->state.conn < C_CONNECTED)
1341                 return;
1342
1343         sock = &mdev->tconn->meta;
1344         p = drbd_prepare_command(mdev, sock);
1345         if (!p)
1346                 return;
1347         p->barrier = barrier_nr;
1348         p->set_size = cpu_to_be32(set_size);
1349         drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1350 }
1351
1352 /**
1353  * _drbd_send_ack() - Sends an ack packet
1354  * @mdev:       DRBD device.
1355  * @cmd:        Packet command code.
1356  * @sector:     sector, needs to be in big endian byte order
1357  * @blksize:    size in byte, needs to be in big endian byte order
1358  * @block_id:   Id, big endian byte order
1359  */
1360 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1361                           u64 sector, u32 blksize, u64 block_id)
1362 {
1363         struct drbd_socket *sock;
1364         struct p_block_ack *p;
1365
1366         if (mdev->state.conn < C_CONNECTED)
1367                 return -EIO;
1368
1369         sock = &mdev->tconn->meta;
1370         p = drbd_prepare_command(mdev, sock);
1371         if (!p)
1372                 return -EIO;
1373         p->sector = sector;
1374         p->block_id = block_id;
1375         p->blksize = blksize;
1376         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1377         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1378 }
1379
1380 /* dp->sector and dp->block_id already/still in network byte order,
1381  * data_size is payload size according to dp->head,
1382  * and may need to be corrected for digest size. */
1383 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1384                       struct p_data *dp, int data_size)
1385 {
1386         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1387                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1388         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1389                        dp->block_id);
1390 }
1391
1392 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1393                       struct p_block_req *rp)
1394 {
1395         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1396 }
1397
1398 /**
1399  * drbd_send_ack() - Sends an ack packet
1400  * @mdev:       DRBD device
1401  * @cmd:        packet command code
1402  * @peer_req:   peer request
1403  */
1404 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1405                   struct drbd_peer_request *peer_req)
1406 {
1407         return _drbd_send_ack(mdev, cmd,
1408                               cpu_to_be64(peer_req->i.sector),
1409                               cpu_to_be32(peer_req->i.size),
1410                               peer_req->block_id);
1411 }
1412
1413 /* This function misuses the block_id field to signal if the blocks
1414  * are is sync or not. */
1415 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1416                      sector_t sector, int blksize, u64 block_id)
1417 {
1418         return _drbd_send_ack(mdev, cmd,
1419                               cpu_to_be64(sector),
1420                               cpu_to_be32(blksize),
1421                               cpu_to_be64(block_id));
1422 }
1423
1424 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1425                        sector_t sector, int size, u64 block_id)
1426 {
1427         struct drbd_socket *sock;
1428         struct p_block_req *p;
1429
1430         sock = &mdev->tconn->data;
1431         p = drbd_prepare_command(mdev, sock);
1432         if (!p)
1433                 return -EIO;
1434         p->sector = cpu_to_be64(sector);
1435         p->block_id = block_id;
1436         p->blksize = cpu_to_be32(size);
1437         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1438 }
1439
1440 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1441                             void *digest, int digest_size, enum drbd_packet cmd)
1442 {
1443         struct drbd_socket *sock;
1444         struct p_block_req *p;
1445
1446         /* FIXME: Put the digest into the preallocated socket buffer.  */
1447
1448         sock = &mdev->tconn->data;
1449         p = drbd_prepare_command(mdev, sock);
1450         if (!p)
1451                 return -EIO;
1452         p->sector = cpu_to_be64(sector);
1453         p->block_id = ID_SYNCER /* unused */;
1454         p->blksize = cpu_to_be32(size);
1455         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1456                                  digest, digest_size);
1457 }
1458
1459 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1460 {
1461         struct drbd_socket *sock;
1462         struct p_block_req *p;
1463
1464         sock = &mdev->tconn->data;
1465         p = drbd_prepare_command(mdev, sock);
1466         if (!p)
1467                 return -EIO;
1468         p->sector = cpu_to_be64(sector);
1469         p->block_id = ID_SYNCER /* unused */;
1470         p->blksize = cpu_to_be32(size);
1471         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1472 }
1473
1474 /* called on sndtimeo
1475  * returns false if we should retry,
1476  * true if we think connection is dead
1477  */
1478 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1479 {
1480         int drop_it;
1481         /* long elapsed = (long)(jiffies - mdev->last_received); */
1482
1483         drop_it =   tconn->meta.socket == sock
1484                 || !tconn->asender.task
1485                 || get_t_state(&tconn->asender) != RUNNING
1486                 || tconn->cstate < C_WF_REPORT_PARAMS;
1487
1488         if (drop_it)
1489                 return true;
1490
1491         drop_it = !--tconn->ko_count;
1492         if (!drop_it) {
1493                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1494                          current->comm, current->pid, tconn->ko_count);
1495                 request_ping(tconn);
1496         }
1497
1498         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1499 }
1500
1501 static void drbd_update_congested(struct drbd_tconn *tconn)
1502 {
1503         struct sock *sk = tconn->data.socket->sk;
1504         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1505                 set_bit(NET_CONGESTED, &tconn->flags);
1506 }
1507
1508 /* The idea of sendpage seems to be to put some kind of reference
1509  * to the page into the skb, and to hand it over to the NIC. In
1510  * this process get_page() gets called.
1511  *
1512  * As soon as the page was really sent over the network put_page()
1513  * gets called by some part of the network layer. [ NIC driver? ]
1514  *
1515  * [ get_page() / put_page() increment/decrement the count. If count
1516  *   reaches 0 the page will be freed. ]
1517  *
1518  * This works nicely with pages from FSs.
1519  * But this means that in protocol A we might signal IO completion too early!
1520  *
1521  * In order not to corrupt data during a resync we must make sure
1522  * that we do not reuse our own buffer pages (EEs) to early, therefore
1523  * we have the net_ee list.
1524  *
1525  * XFS seems to have problems, still, it submits pages with page_count == 0!
1526  * As a workaround, we disable sendpage on pages
1527  * with page_count == 0 or PageSlab.
1528  */
1529 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1530                               int offset, size_t size, unsigned msg_flags)
1531 {
1532         struct socket *socket;
1533         void *addr;
1534         int err;
1535
1536         socket = mdev->tconn->data.socket;
1537         addr = kmap(page) + offset;
1538         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1539         kunmap(page);
1540         if (!err)
1541                 mdev->send_cnt += size >> 9;
1542         return err;
1543 }
1544
1545 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1546                     int offset, size_t size, unsigned msg_flags)
1547 {
1548         struct socket *socket = mdev->tconn->data.socket;
1549         mm_segment_t oldfs = get_fs();
1550         int len = size;
1551         int err = -EIO;
1552
1553         /* e.g. XFS meta- & log-data is in slab pages, which have a
1554          * page_count of 0 and/or have PageSlab() set.
1555          * we cannot use send_page for those, as that does get_page();
1556          * put_page(); and would cause either a VM_BUG directly, or
1557          * __page_cache_release a page that would actually still be referenced
1558          * by someone, leading to some obscure delayed Oops somewhere else. */
1559         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1560                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1561
1562         msg_flags |= MSG_NOSIGNAL;
1563         drbd_update_congested(mdev->tconn);
1564         set_fs(KERNEL_DS);
1565         do {
1566                 int sent;
1567
1568                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1569                 if (sent <= 0) {
1570                         if (sent == -EAGAIN) {
1571                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1572                                         break;
1573                                 continue;
1574                         }
1575                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1576                              __func__, (int)size, len, sent);
1577                         if (sent < 0)
1578                                 err = sent;
1579                         break;
1580                 }
1581                 len    -= sent;
1582                 offset += sent;
1583         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1584         set_fs(oldfs);
1585         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1586
1587         if (len == 0) {
1588                 err = 0;
1589                 mdev->send_cnt += size >> 9;
1590         }
1591         return err;
1592 }
1593
1594 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1595 {
1596         struct bio_vec *bvec;
1597         int i;
1598         /* hint all but last page with MSG_MORE */
1599         __bio_for_each_segment(bvec, bio, i, 0) {
1600                 int err;
1601
1602                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1603                                          bvec->bv_offset, bvec->bv_len,
1604                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1605                 if (err)
1606                         return err;
1607         }
1608         return 0;
1609 }
1610
1611 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1612 {
1613         struct bio_vec *bvec;
1614         int i;
1615         /* hint all but last page with MSG_MORE */
1616         __bio_for_each_segment(bvec, bio, i, 0) {
1617                 int err;
1618
1619                 err = _drbd_send_page(mdev, bvec->bv_page,
1620                                       bvec->bv_offset, bvec->bv_len,
1621                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1622                 if (err)
1623                         return err;
1624         }
1625         return 0;
1626 }
1627
1628 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1629                             struct drbd_peer_request *peer_req)
1630 {
1631         struct page *page = peer_req->pages;
1632         unsigned len = peer_req->i.size;
1633         int err;
1634
1635         /* hint all but last page with MSG_MORE */
1636         page_chain_for_each(page) {
1637                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1638
1639                 err = _drbd_send_page(mdev, page, 0, l,
1640                                       page_chain_next(page) ? MSG_MORE : 0);
1641                 if (err)
1642                         return err;
1643                 len -= l;
1644         }
1645         return 0;
1646 }
1647
1648 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1649 {
1650         if (mdev->tconn->agreed_pro_version >= 95)
1651                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1652                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1653                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1654                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1655         else
1656                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1657 }
1658
1659 /* Used to send write requests
1660  * R_PRIMARY -> Peer    (P_DATA)
1661  */
1662 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1663 {
1664         struct drbd_socket *sock;
1665         struct p_data *p;
1666         unsigned int dp_flags = 0;
1667         int dgs;
1668         int err;
1669
1670         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1671                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1672
1673         sock = &mdev->tconn->data;
1674         p = drbd_prepare_command(mdev, sock);
1675         if (!p)
1676                 return -EIO;
1677         p->sector = cpu_to_be64(req->i.sector);
1678         p->block_id = (unsigned long)req;
1679         p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1680         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1681         if (mdev->state.conn >= C_SYNC_SOURCE &&
1682             mdev->state.conn <= C_PAUSED_SYNC_T)
1683                 dp_flags |= DP_MAY_SET_IN_SYNC;
1684         p->dp_flags = cpu_to_be32(dp_flags);
1685         if (dgs)
1686                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, p + 1);
1687         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1688         if (!err) {
1689                 /* For protocol A, we have to memcpy the payload into
1690                  * socket buffers, as we may complete right away
1691                  * as soon as we handed it over to tcp, at which point the data
1692                  * pages may become invalid.
1693                  *
1694                  * For data-integrity enabled, we copy it as well, so we can be
1695                  * sure that even if the bio pages may still be modified, it
1696                  * won't change the data on the wire, thus if the digest checks
1697                  * out ok after sending on this side, but does not fit on the
1698                  * receiving side, we sure have detected corruption elsewhere.
1699                  */
1700                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
1701                         err = _drbd_send_bio(mdev, req->master_bio);
1702                 else
1703                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1704
1705                 /* double check digest, sometimes buffers have been modified in flight. */
1706                 if (dgs > 0 && dgs <= 64) {
1707                         /* 64 byte, 512 bit, is the largest digest size
1708                          * currently supported in kernel crypto. */
1709                         unsigned char digest[64];
1710                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1711                         if (memcmp(p + 1, digest, dgs)) {
1712                                 dev_warn(DEV,
1713                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1714                                         (unsigned long long)req->i.sector, req->i.size);
1715                         }
1716                 } /* else if (dgs > 64) {
1717                      ... Be noisy about digest too large ...
1718                 } */
1719         }
1720         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1721
1722         return err;
1723 }
1724
1725 /* answer packet, used to send data back for read requests:
1726  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1727  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1728  */
1729 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1730                     struct drbd_peer_request *peer_req)
1731 {
1732         struct drbd_socket *sock;
1733         struct p_data *p;
1734         int err;
1735         int dgs;
1736
1737         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1738                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1739
1740         sock = &mdev->tconn->data;
1741         p = drbd_prepare_command(mdev, sock);
1742         if (!p)
1743                 return -EIO;
1744         p->sector = cpu_to_be64(peer_req->i.sector);
1745         p->block_id = peer_req->block_id;
1746         p->seq_num = 0;  /* unused */
1747         if (dgs)
1748                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, p + 1);
1749         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1750         if (!err)
1751                 err = _drbd_send_zc_ee(mdev, peer_req);
1752         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1753
1754         return err;
1755 }
1756
1757 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1758 {
1759         struct drbd_socket *sock;
1760         struct p_block_desc *p;
1761
1762         sock = &mdev->tconn->data;
1763         p = drbd_prepare_command(mdev, sock);
1764         if (!p)
1765                 return -EIO;
1766         p->sector = cpu_to_be64(req->i.sector);
1767         p->blksize = cpu_to_be32(req->i.size);
1768         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1769 }
1770
1771 /*
1772   drbd_send distinguishes two cases:
1773
1774   Packets sent via the data socket "sock"
1775   and packets sent via the meta data socket "msock"
1776
1777                     sock                      msock
1778   -----------------+-------------------------+------------------------------
1779   timeout           conf.timeout / 2          conf.timeout / 2
1780   timeout action    send a ping via msock     Abort communication
1781                                               and close all sockets
1782 */
1783
1784 /*
1785  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1786  */
1787 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1788               void *buf, size_t size, unsigned msg_flags)
1789 {
1790         struct kvec iov;
1791         struct msghdr msg;
1792         int rv, sent = 0;
1793
1794         if (!sock)
1795                 return -EBADR;
1796
1797         /* THINK  if (signal_pending) return ... ? */
1798
1799         iov.iov_base = buf;
1800         iov.iov_len  = size;
1801
1802         msg.msg_name       = NULL;
1803         msg.msg_namelen    = 0;
1804         msg.msg_control    = NULL;
1805         msg.msg_controllen = 0;
1806         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1807
1808         if (sock == tconn->data.socket) {
1809                 tconn->ko_count = tconn->net_conf->ko_count;
1810                 drbd_update_congested(tconn);
1811         }
1812         do {
1813                 /* STRANGE
1814                  * tcp_sendmsg does _not_ use its size parameter at all ?
1815                  *
1816                  * -EAGAIN on timeout, -EINTR on signal.
1817                  */
1818 /* THINK
1819  * do we need to block DRBD_SIG if sock == &meta.socket ??
1820  * otherwise wake_asender() might interrupt some send_*Ack !
1821  */
1822                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1823                 if (rv == -EAGAIN) {
1824                         if (we_should_drop_the_connection(tconn, sock))
1825                                 break;
1826                         else
1827                                 continue;
1828                 }
1829                 if (rv == -EINTR) {
1830                         flush_signals(current);
1831                         rv = 0;
1832                 }
1833                 if (rv < 0)
1834                         break;
1835                 sent += rv;
1836                 iov.iov_base += rv;
1837                 iov.iov_len  -= rv;
1838         } while (sent < size);
1839
1840         if (sock == tconn->data.socket)
1841                 clear_bit(NET_CONGESTED, &tconn->flags);
1842
1843         if (rv <= 0) {
1844                 if (rv != -EAGAIN) {
1845                         conn_err(tconn, "%s_sendmsg returned %d\n",
1846                                  sock == tconn->meta.socket ? "msock" : "sock",
1847                                  rv);
1848                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1849                 } else
1850                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1851         }
1852
1853         return sent;
1854 }
1855
1856 /**
1857  * drbd_send_all  -  Send an entire buffer
1858  *
1859  * Returns 0 upon success and a negative error value otherwise.
1860  */
1861 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1862                   size_t size, unsigned msg_flags)
1863 {
1864         int err;
1865
1866         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1867         if (err < 0)
1868                 return err;
1869         if (err != size)
1870                 return -EIO;
1871         return 0;
1872 }
1873
1874 static int drbd_open(struct block_device *bdev, fmode_t mode)
1875 {
1876         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1877         unsigned long flags;
1878         int rv = 0;
1879
1880         mutex_lock(&drbd_main_mutex);
1881         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1882         /* to have a stable mdev->state.role
1883          * and no race with updating open_cnt */
1884
1885         if (mdev->state.role != R_PRIMARY) {
1886                 if (mode & FMODE_WRITE)
1887                         rv = -EROFS;
1888                 else if (!allow_oos)
1889                         rv = -EMEDIUMTYPE;
1890         }
1891
1892         if (!rv)
1893                 mdev->open_cnt++;
1894         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1895         mutex_unlock(&drbd_main_mutex);
1896
1897         return rv;
1898 }
1899
1900 static int drbd_release(struct gendisk *gd, fmode_t mode)
1901 {
1902         struct drbd_conf *mdev = gd->private_data;
1903         mutex_lock(&drbd_main_mutex);
1904         mdev->open_cnt--;
1905         mutex_unlock(&drbd_main_mutex);
1906         return 0;
1907 }
1908
1909 static void drbd_set_defaults(struct drbd_conf *mdev)
1910 {
1911         /* Beware! The actual layout differs
1912          * between big endian and little endian */
1913         mdev->state = (union drbd_dev_state) {
1914                 { .role = R_SECONDARY,
1915                   .peer = R_UNKNOWN,
1916                   .conn = C_STANDALONE,
1917                   .disk = D_DISKLESS,
1918                   .pdsk = D_UNKNOWN,
1919                 } };
1920 }
1921
1922 void drbd_init_set_defaults(struct drbd_conf *mdev)
1923 {
1924         /* the memset(,0,) did most of this.
1925          * note: only assignments, no allocation in here */
1926
1927         drbd_set_defaults(mdev);
1928
1929         atomic_set(&mdev->ap_bio_cnt, 0);
1930         atomic_set(&mdev->ap_pending_cnt, 0);
1931         atomic_set(&mdev->rs_pending_cnt, 0);
1932         atomic_set(&mdev->unacked_cnt, 0);
1933         atomic_set(&mdev->local_cnt, 0);
1934         atomic_set(&mdev->pp_in_use_by_net, 0);
1935         atomic_set(&mdev->rs_sect_in, 0);
1936         atomic_set(&mdev->rs_sect_ev, 0);
1937         atomic_set(&mdev->ap_in_flight, 0);
1938
1939         mutex_init(&mdev->md_io_mutex);
1940         mutex_init(&mdev->own_state_mutex);
1941         mdev->state_mutex = &mdev->own_state_mutex;
1942
1943         spin_lock_init(&mdev->al_lock);
1944         spin_lock_init(&mdev->peer_seq_lock);
1945         spin_lock_init(&mdev->epoch_lock);
1946
1947         INIT_LIST_HEAD(&mdev->active_ee);
1948         INIT_LIST_HEAD(&mdev->sync_ee);
1949         INIT_LIST_HEAD(&mdev->done_ee);
1950         INIT_LIST_HEAD(&mdev->read_ee);
1951         INIT_LIST_HEAD(&mdev->net_ee);
1952         INIT_LIST_HEAD(&mdev->resync_reads);
1953         INIT_LIST_HEAD(&mdev->resync_work.list);
1954         INIT_LIST_HEAD(&mdev->unplug_work.list);
1955         INIT_LIST_HEAD(&mdev->go_diskless.list);
1956         INIT_LIST_HEAD(&mdev->md_sync_work.list);
1957         INIT_LIST_HEAD(&mdev->start_resync_work.list);
1958         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1959
1960         mdev->resync_work.cb  = w_resync_timer;
1961         mdev->unplug_work.cb  = w_send_write_hint;
1962         mdev->go_diskless.cb  = w_go_diskless;
1963         mdev->md_sync_work.cb = w_md_sync;
1964         mdev->bm_io_work.w.cb = w_bitmap_io;
1965         mdev->start_resync_work.cb = w_start_resync;
1966
1967         mdev->resync_work.mdev  = mdev;
1968         mdev->unplug_work.mdev  = mdev;
1969         mdev->go_diskless.mdev  = mdev;
1970         mdev->md_sync_work.mdev = mdev;
1971         mdev->bm_io_work.w.mdev = mdev;
1972         mdev->start_resync_work.mdev = mdev;
1973
1974         init_timer(&mdev->resync_timer);
1975         init_timer(&mdev->md_sync_timer);
1976         init_timer(&mdev->start_resync_timer);
1977         init_timer(&mdev->request_timer);
1978         mdev->resync_timer.function = resync_timer_fn;
1979         mdev->resync_timer.data = (unsigned long) mdev;
1980         mdev->md_sync_timer.function = md_sync_timer_fn;
1981         mdev->md_sync_timer.data = (unsigned long) mdev;
1982         mdev->start_resync_timer.function = start_resync_timer_fn;
1983         mdev->start_resync_timer.data = (unsigned long) mdev;
1984         mdev->request_timer.function = request_timer_fn;
1985         mdev->request_timer.data = (unsigned long) mdev;
1986
1987         init_waitqueue_head(&mdev->misc_wait);
1988         init_waitqueue_head(&mdev->state_wait);
1989         init_waitqueue_head(&mdev->ee_wait);
1990         init_waitqueue_head(&mdev->al_wait);
1991         init_waitqueue_head(&mdev->seq_wait);
1992
1993         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
1994         mdev->write_ordering = WO_bdev_flush;
1995         mdev->resync_wenr = LC_FREE;
1996         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1997         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1998 }
1999
2000 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2001 {
2002         int i;
2003         if (mdev->tconn->receiver.t_state != NONE)
2004                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2005                                 mdev->tconn->receiver.t_state);
2006
2007         /* no need to lock it, I'm the only thread alive */
2008         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2009                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2010         mdev->al_writ_cnt  =
2011         mdev->bm_writ_cnt  =
2012         mdev->read_cnt     =
2013         mdev->recv_cnt     =
2014         mdev->send_cnt     =
2015         mdev->writ_cnt     =
2016         mdev->p_size       =
2017         mdev->rs_start     =
2018         mdev->rs_total     =
2019         mdev->rs_failed    = 0;
2020         mdev->rs_last_events = 0;
2021         mdev->rs_last_sect_ev = 0;
2022         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2023                 mdev->rs_mark_left[i] = 0;
2024                 mdev->rs_mark_time[i] = 0;
2025         }
2026         D_ASSERT(mdev->tconn->net_conf == NULL);
2027
2028         drbd_set_my_capacity(mdev, 0);
2029         if (mdev->bitmap) {
2030                 /* maybe never allocated. */
2031                 drbd_bm_resize(mdev, 0, 1);
2032                 drbd_bm_cleanup(mdev);
2033         }
2034
2035         drbd_free_resources(mdev);
2036         clear_bit(AL_SUSPENDED, &mdev->flags);
2037
2038         D_ASSERT(list_empty(&mdev->active_ee));
2039         D_ASSERT(list_empty(&mdev->sync_ee));
2040         D_ASSERT(list_empty(&mdev->done_ee));
2041         D_ASSERT(list_empty(&mdev->read_ee));
2042         D_ASSERT(list_empty(&mdev->net_ee));
2043         D_ASSERT(list_empty(&mdev->resync_reads));
2044         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2045         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2046         D_ASSERT(list_empty(&mdev->resync_work.list));
2047         D_ASSERT(list_empty(&mdev->unplug_work.list));
2048         D_ASSERT(list_empty(&mdev->go_diskless.list));
2049
2050         drbd_set_defaults(mdev);
2051 }
2052
2053
2054 static void drbd_destroy_mempools(void)
2055 {
2056         struct page *page;
2057
2058         while (drbd_pp_pool) {
2059                 page = drbd_pp_pool;
2060                 drbd_pp_pool = (struct page *)page_private(page);
2061                 __free_page(page);
2062                 drbd_pp_vacant--;
2063         }
2064
2065         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2066
2067         if (drbd_md_io_bio_set)
2068                 bioset_free(drbd_md_io_bio_set);
2069         if (drbd_md_io_page_pool)
2070                 mempool_destroy(drbd_md_io_page_pool);
2071         if (drbd_ee_mempool)
2072                 mempool_destroy(drbd_ee_mempool);
2073         if (drbd_request_mempool)
2074                 mempool_destroy(drbd_request_mempool);
2075         if (drbd_ee_cache)
2076                 kmem_cache_destroy(drbd_ee_cache);
2077         if (drbd_request_cache)
2078                 kmem_cache_destroy(drbd_request_cache);
2079         if (drbd_bm_ext_cache)
2080                 kmem_cache_destroy(drbd_bm_ext_cache);
2081         if (drbd_al_ext_cache)
2082                 kmem_cache_destroy(drbd_al_ext_cache);
2083
2084         drbd_md_io_bio_set   = NULL;
2085         drbd_md_io_page_pool = NULL;
2086         drbd_ee_mempool      = NULL;
2087         drbd_request_mempool = NULL;
2088         drbd_ee_cache        = NULL;
2089         drbd_request_cache   = NULL;
2090         drbd_bm_ext_cache    = NULL;
2091         drbd_al_ext_cache    = NULL;
2092
2093         return;
2094 }
2095
2096 static int drbd_create_mempools(void)
2097 {
2098         struct page *page;
2099         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2100         int i;
2101
2102         /* prepare our caches and mempools */
2103         drbd_request_mempool = NULL;
2104         drbd_ee_cache        = NULL;
2105         drbd_request_cache   = NULL;
2106         drbd_bm_ext_cache    = NULL;
2107         drbd_al_ext_cache    = NULL;
2108         drbd_pp_pool         = NULL;
2109         drbd_md_io_page_pool = NULL;
2110         drbd_md_io_bio_set   = NULL;
2111
2112         /* caches */
2113         drbd_request_cache = kmem_cache_create(
2114                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2115         if (drbd_request_cache == NULL)
2116                 goto Enomem;
2117
2118         drbd_ee_cache = kmem_cache_create(
2119                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2120         if (drbd_ee_cache == NULL)
2121                 goto Enomem;
2122
2123         drbd_bm_ext_cache = kmem_cache_create(
2124                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2125         if (drbd_bm_ext_cache == NULL)
2126                 goto Enomem;
2127
2128         drbd_al_ext_cache = kmem_cache_create(
2129                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2130         if (drbd_al_ext_cache == NULL)
2131                 goto Enomem;
2132
2133         /* mempools */
2134         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2135         if (drbd_md_io_bio_set == NULL)
2136                 goto Enomem;
2137
2138         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2139         if (drbd_md_io_page_pool == NULL)
2140                 goto Enomem;
2141
2142         drbd_request_mempool = mempool_create(number,
2143                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2144         if (drbd_request_mempool == NULL)
2145                 goto Enomem;
2146
2147         drbd_ee_mempool = mempool_create(number,
2148                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2149         if (drbd_ee_mempool == NULL)
2150                 goto Enomem;
2151
2152         /* drbd's page pool */
2153         spin_lock_init(&drbd_pp_lock);
2154
2155         for (i = 0; i < number; i++) {
2156                 page = alloc_page(GFP_HIGHUSER);
2157                 if (!page)
2158                         goto Enomem;
2159                 set_page_private(page, (unsigned long)drbd_pp_pool);
2160                 drbd_pp_pool = page;
2161         }
2162         drbd_pp_vacant = number;
2163
2164         return 0;
2165
2166 Enomem:
2167         drbd_destroy_mempools(); /* in case we allocated some */
2168         return -ENOMEM;
2169 }
2170
2171 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2172         void *unused)
2173 {
2174         /* just so we have it.  you never know what interesting things we
2175          * might want to do here some day...
2176          */
2177
2178         return NOTIFY_DONE;
2179 }
2180
2181 static struct notifier_block drbd_notifier = {
2182         .notifier_call = drbd_notify_sys,
2183 };
2184
2185 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2186 {
2187         int rr;
2188
2189         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2190         if (rr)
2191                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2192
2193         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2194         if (rr)
2195                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2196
2197         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2198         if (rr)
2199                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2200
2201         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2202         if (rr)
2203                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2204
2205         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2206         if (rr)
2207                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2208 }
2209
2210 /* caution. no locking. */
2211 void drbd_delete_device(struct drbd_conf *mdev)
2212 {
2213         idr_remove(&mdev->tconn->volumes, mdev->vnr);
2214         idr_remove(&minors, mdev_to_minor(mdev));
2215         synchronize_rcu();
2216
2217         /* paranoia asserts */
2218         D_ASSERT(mdev->open_cnt == 0);
2219         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2220         /* end paranoia asserts */
2221
2222         del_gendisk(mdev->vdisk);
2223
2224         /* cleanup stuff that may have been allocated during
2225          * device (re-)configuration or state changes */
2226
2227         if (mdev->this_bdev)
2228                 bdput(mdev->this_bdev);
2229
2230         drbd_free_resources(mdev);
2231
2232         drbd_release_all_peer_reqs(mdev);
2233
2234         lc_destroy(mdev->act_log);
2235         lc_destroy(mdev->resync);
2236
2237         kfree(mdev->p_uuid);
2238         /* mdev->p_uuid = NULL; */
2239
2240         kfree(mdev->current_epoch);
2241         if (mdev->bitmap) /* should no longer be there. */
2242                 drbd_bm_cleanup(mdev);
2243         __free_page(mdev->md_io_page);
2244         put_disk(mdev->vdisk);
2245         blk_cleanup_queue(mdev->rq_queue);
2246         kfree(mdev);
2247 }
2248
2249 static void drbd_cleanup(void)
2250 {
2251         unsigned int i;
2252         struct drbd_conf *mdev;
2253
2254         unregister_reboot_notifier(&drbd_notifier);
2255
2256         /* first remove proc,
2257          * drbdsetup uses it's presence to detect
2258          * whether DRBD is loaded.
2259          * If we would get stuck in proc removal,
2260          * but have netlink already deregistered,
2261          * some drbdsetup commands may wait forever
2262          * for an answer.
2263          */
2264         if (drbd_proc)
2265                 remove_proc_entry("drbd", NULL);
2266
2267         drbd_genl_unregister();
2268
2269         idr_for_each_entry(&minors, mdev, i)
2270                 drbd_delete_device(mdev);
2271
2272         drbd_destroy_mempools();
2273         unregister_blkdev(DRBD_MAJOR, "drbd");
2274
2275         idr_destroy(&minors);
2276
2277         printk(KERN_INFO "drbd: module cleanup done.\n");
2278 }
2279
2280 /**
2281  * drbd_congested() - Callback for pdflush
2282  * @congested_data:     User data
2283  * @bdi_bits:           Bits pdflush is currently interested in
2284  *
2285  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2286  */
2287 static int drbd_congested(void *congested_data, int bdi_bits)
2288 {
2289         struct drbd_conf *mdev = congested_data;
2290         struct request_queue *q;
2291         char reason = '-';
2292         int r = 0;
2293
2294         if (!may_inc_ap_bio(mdev)) {
2295                 /* DRBD has frozen IO */
2296                 r = bdi_bits;
2297                 reason = 'd';
2298                 goto out;
2299         }
2300
2301         if (get_ldev(mdev)) {
2302                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2303                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2304                 put_ldev(mdev);
2305                 if (r)
2306                         reason = 'b';
2307         }
2308
2309         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2310                 r |= (1 << BDI_async_congested);
2311                 reason = reason == 'b' ? 'a' : 'n';
2312         }
2313
2314 out:
2315         mdev->congestion_reason = reason;
2316         return r;
2317 }
2318
2319 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2320 {
2321         sema_init(&wq->s, 0);
2322         spin_lock_init(&wq->q_lock);
2323         INIT_LIST_HEAD(&wq->q);
2324 }
2325
2326 struct drbd_tconn *conn_by_name(const char *name)
2327 {
2328         struct drbd_tconn *tconn;
2329
2330         if (!name || !name[0])
2331                 return NULL;
2332
2333         mutex_lock(&drbd_cfg_mutex);
2334         list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
2335                 if (!strcmp(tconn->name, name))
2336                         goto found;
2337         }
2338         tconn = NULL;
2339 found:
2340         mutex_unlock(&drbd_cfg_mutex);
2341         return tconn;
2342 }
2343
2344 static int drbd_alloc_socket(struct drbd_socket *socket)
2345 {
2346         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2347         if (!socket->rbuf)
2348                 return -ENOMEM;
2349         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2350         if (!socket->sbuf)
2351                 return -ENOMEM;
2352         return 0;
2353 }
2354
2355 static void drbd_free_socket(struct drbd_socket *socket)
2356 {
2357         free_page((unsigned long) socket->sbuf);
2358         free_page((unsigned long) socket->rbuf);
2359 }
2360
2361 struct drbd_tconn *drbd_new_tconn(const char *name)
2362 {
2363         struct drbd_tconn *tconn;
2364
2365         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2366         if (!tconn)
2367                 return NULL;
2368
2369         tconn->name = kstrdup(name, GFP_KERNEL);
2370         if (!tconn->name)
2371                 goto fail;
2372
2373         if (drbd_alloc_socket(&tconn->data))
2374                 goto fail;
2375         if (drbd_alloc_socket(&tconn->meta))
2376                 goto fail;
2377
2378         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2379                 goto fail;
2380
2381         if (!tl_init(tconn))
2382                 goto fail;
2383
2384         tconn->cstate = C_STANDALONE;
2385         mutex_init(&tconn->cstate_mutex);
2386         spin_lock_init(&tconn->req_lock);
2387         atomic_set(&tconn->net_cnt, 0);
2388         init_waitqueue_head(&tconn->net_cnt_wait);
2389         init_waitqueue_head(&tconn->ping_wait);
2390         idr_init(&tconn->volumes);
2391
2392         drbd_init_workqueue(&tconn->data.work);
2393         mutex_init(&tconn->data.mutex);
2394
2395         drbd_init_workqueue(&tconn->meta.work);
2396         mutex_init(&tconn->meta.mutex);
2397
2398         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2399         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2400         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2401
2402         tconn->res_opts = (struct res_opts) {
2403                 {}, 0, /* cpu_mask */
2404                 DRBD_ON_NO_DATA_DEF, /* on_no_data */
2405         };
2406
2407         mutex_lock(&drbd_cfg_mutex);
2408         list_add_tail(&tconn->all_tconn, &drbd_tconns);
2409         mutex_unlock(&drbd_cfg_mutex);
2410
2411         return tconn;
2412
2413 fail:
2414         tl_cleanup(tconn);
2415         free_cpumask_var(tconn->cpu_mask);
2416         drbd_free_socket(&tconn->meta);
2417         drbd_free_socket(&tconn->data);
2418         kfree(tconn->name);
2419         kfree(tconn);
2420
2421         return NULL;
2422 }
2423
2424 void drbd_free_tconn(struct drbd_tconn *tconn)
2425 {
2426         list_del(&tconn->all_tconn);
2427         idr_destroy(&tconn->volumes);
2428
2429         free_cpumask_var(tconn->cpu_mask);
2430         drbd_free_socket(&tconn->meta);
2431         drbd_free_socket(&tconn->data);
2432         kfree(tconn->name);
2433         kfree(tconn->int_dig_in);
2434         kfree(tconn->int_dig_vv);
2435         kfree(tconn);
2436 }
2437
2438 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2439 {
2440         struct drbd_conf *mdev;
2441         struct gendisk *disk;
2442         struct request_queue *q;
2443         int vnr_got = vnr;
2444         int minor_got = minor;
2445         enum drbd_ret_code err = ERR_NOMEM;
2446
2447         mdev = minor_to_mdev(minor);
2448         if (mdev)
2449                 return ERR_MINOR_EXISTS;
2450
2451         /* GFP_KERNEL, we are outside of all write-out paths */
2452         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2453         if (!mdev)
2454                 return ERR_NOMEM;
2455
2456         mdev->tconn = tconn;
2457         mdev->minor = minor;
2458         mdev->vnr = vnr;
2459
2460         drbd_init_set_defaults(mdev);
2461
2462         q = blk_alloc_queue(GFP_KERNEL);
2463         if (!q)
2464                 goto out_no_q;
2465         mdev->rq_queue = q;
2466         q->queuedata   = mdev;
2467
2468         disk = alloc_disk(1);
2469         if (!disk)
2470                 goto out_no_disk;
2471         mdev->vdisk = disk;
2472
2473         set_disk_ro(disk, true);
2474
2475         disk->queue = q;
2476         disk->major = DRBD_MAJOR;
2477         disk->first_minor = minor;
2478         disk->fops = &drbd_ops;
2479         sprintf(disk->disk_name, "drbd%d", minor);
2480         disk->private_data = mdev;
2481
2482         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2483         /* we have no partitions. we contain only ourselves. */
2484         mdev->this_bdev->bd_contains = mdev->this_bdev;
2485
2486         q->backing_dev_info.congested_fn = drbd_congested;
2487         q->backing_dev_info.congested_data = mdev;
2488
2489         blk_queue_make_request(q, drbd_make_request);
2490         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2491            This triggers a max_bio_size message upon first attach or connect */
2492         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2493         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2494         blk_queue_merge_bvec(q, drbd_merge_bvec);
2495         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2496
2497         mdev->md_io_page = alloc_page(GFP_KERNEL);
2498         if (!mdev->md_io_page)
2499                 goto out_no_io_page;
2500
2501         if (drbd_bm_init(mdev))
2502                 goto out_no_bitmap;
2503         mdev->read_requests = RB_ROOT;
2504         mdev->write_requests = RB_ROOT;
2505
2506         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2507         if (!mdev->current_epoch)
2508                 goto out_no_epoch;
2509
2510         INIT_LIST_HEAD(&mdev->current_epoch->list);
2511         mdev->epochs = 1;
2512
2513         if (!idr_pre_get(&minors, GFP_KERNEL))
2514                 goto out_no_minor_idr;
2515         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2516                 goto out_no_minor_idr;
2517         if (minor_got != minor) {
2518                 err = ERR_MINOR_EXISTS;
2519                 drbd_msg_put_info("requested minor exists already");
2520                 goto out_idr_remove_minor;
2521         }
2522
2523         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2524                 goto out_idr_remove_minor;
2525         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2526                 goto out_idr_remove_minor;
2527         if (vnr_got != vnr) {
2528                 err = ERR_INVALID_REQUEST;
2529                 drbd_msg_put_info("requested volume exists already");
2530                 goto out_idr_remove_vol;
2531         }
2532         add_disk(disk);
2533
2534         /* inherit the connection state */
2535         mdev->state.conn = tconn->cstate;
2536         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2537                 drbd_connected(vnr, mdev, tconn);
2538
2539         return NO_ERROR;
2540
2541 out_idr_remove_vol:
2542         idr_remove(&tconn->volumes, vnr_got);
2543 out_idr_remove_minor:
2544         idr_remove(&minors, minor_got);
2545         synchronize_rcu();
2546 out_no_minor_idr:
2547         kfree(mdev->current_epoch);
2548 out_no_epoch:
2549         drbd_bm_cleanup(mdev);
2550 out_no_bitmap:
2551         __free_page(mdev->md_io_page);
2552 out_no_io_page:
2553         put_disk(disk);
2554 out_no_disk:
2555         blk_cleanup_queue(q);
2556 out_no_q:
2557         kfree(mdev);
2558         return err;
2559 }
2560
2561 int __init drbd_init(void)
2562 {
2563         int err;
2564
2565         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2566                 printk(KERN_ERR
2567                        "drbd: invalid minor_count (%d)\n", minor_count);
2568 #ifdef MODULE
2569                 return -EINVAL;
2570 #else
2571                 minor_count = 8;
2572 #endif
2573         }
2574
2575         err = register_blkdev(DRBD_MAJOR, "drbd");
2576         if (err) {
2577                 printk(KERN_ERR
2578                        "drbd: unable to register block device major %d\n",
2579                        DRBD_MAJOR);
2580                 return err;
2581         }
2582
2583         err = drbd_genl_register();
2584         if (err) {
2585                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2586                 goto fail;
2587         }
2588
2589
2590         register_reboot_notifier(&drbd_notifier);
2591
2592         /*
2593          * allocate all necessary structs
2594          */
2595         err = -ENOMEM;
2596
2597         init_waitqueue_head(&drbd_pp_wait);
2598
2599         drbd_proc = NULL; /* play safe for drbd_cleanup */
2600         idr_init(&minors);
2601
2602         err = drbd_create_mempools();
2603         if (err)
2604                 goto fail;
2605
2606         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2607         if (!drbd_proc) {
2608                 printk(KERN_ERR "drbd: unable to register proc file\n");
2609                 goto fail;
2610         }
2611
2612         rwlock_init(&global_state_lock);
2613         INIT_LIST_HEAD(&drbd_tconns);
2614
2615         printk(KERN_INFO "drbd: initialized. "
2616                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2617                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2618         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2619         printk(KERN_INFO "drbd: registered as block device major %d\n",
2620                 DRBD_MAJOR);
2621
2622         return 0; /* Success! */
2623
2624 fail:
2625         drbd_cleanup();
2626         if (err == -ENOMEM)
2627                 /* currently always the case */
2628                 printk(KERN_ERR "drbd: ran out of memory\n");
2629         else
2630                 printk(KERN_ERR "drbd: initialization failure\n");
2631         return err;
2632 }
2633
2634 void drbd_free_bc(struct drbd_backing_dev *ldev)
2635 {
2636         if (ldev == NULL)
2637                 return;
2638
2639         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2640         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2641
2642         kfree(ldev);
2643 }
2644
2645 void drbd_free_sock(struct drbd_tconn *tconn)
2646 {
2647         if (tconn->data.socket) {
2648                 mutex_lock(&tconn->data.mutex);
2649                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2650                 sock_release(tconn->data.socket);
2651                 tconn->data.socket = NULL;
2652                 mutex_unlock(&tconn->data.mutex);
2653         }
2654         if (tconn->meta.socket) {
2655                 mutex_lock(&tconn->meta.mutex);
2656                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2657                 sock_release(tconn->meta.socket);
2658                 tconn->meta.socket = NULL;
2659                 mutex_unlock(&tconn->meta.mutex);
2660         }
2661 }
2662
2663
2664 void drbd_free_resources(struct drbd_conf *mdev)
2665 {
2666         crypto_free_hash(mdev->tconn->csums_tfm);
2667         mdev->tconn->csums_tfm = NULL;
2668         crypto_free_hash(mdev->tconn->verify_tfm);
2669         mdev->tconn->verify_tfm = NULL;
2670         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
2671         mdev->tconn->cram_hmac_tfm = NULL;
2672         crypto_free_hash(mdev->tconn->integrity_w_tfm);
2673         mdev->tconn->integrity_w_tfm = NULL;
2674         crypto_free_hash(mdev->tconn->integrity_r_tfm);
2675         mdev->tconn->integrity_r_tfm = NULL;
2676
2677         drbd_free_sock(mdev->tconn);
2678
2679         __no_warn(local,
2680                   drbd_free_bc(mdev->ldev);
2681                   mdev->ldev = NULL;);
2682 }
2683
2684 /* meta data management */
2685
2686 struct meta_data_on_disk {
2687         u64 la_size;           /* last agreed size. */
2688         u64 uuid[UI_SIZE];   /* UUIDs. */
2689         u64 device_uuid;
2690         u64 reserved_u64_1;
2691         u32 flags;             /* MDF */
2692         u32 magic;
2693         u32 md_size_sect;
2694         u32 al_offset;         /* offset to this block */
2695         u32 al_nr_extents;     /* important for restoring the AL */
2696               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2697         u32 bm_offset;         /* offset to the bitmap, from here */
2698         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2699         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2700         u32 reserved_u32[3];
2701
2702 } __packed;
2703
2704 /**
2705  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2706  * @mdev:       DRBD device.
2707  */
2708 void drbd_md_sync(struct drbd_conf *mdev)
2709 {
2710         struct meta_data_on_disk *buffer;
2711         sector_t sector;
2712         int i;
2713
2714         del_timer(&mdev->md_sync_timer);
2715         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2716         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2717                 return;
2718
2719         /* We use here D_FAILED and not D_ATTACHING because we try to write
2720          * metadata even if we detach due to a disk failure! */
2721         if (!get_ldev_if_state(mdev, D_FAILED))
2722                 return;
2723
2724         mutex_lock(&mdev->md_io_mutex);
2725         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2726         memset(buffer, 0, 512);
2727
2728         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2729         for (i = UI_CURRENT; i < UI_SIZE; i++)
2730                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2731         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2732         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2733
2734         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2735         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2736         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2737         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2738         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2739
2740         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2741         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2742
2743         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2744         sector = mdev->ldev->md.md_offset;
2745
2746         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2747                 /* this was a try anyways ... */
2748                 dev_err(DEV, "meta data update failed!\n");
2749                 drbd_chk_io_error(mdev, 1, true);
2750         }
2751
2752         /* Update mdev->ldev->md.la_size_sect,
2753          * since we updated it on metadata. */
2754         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2755
2756         mutex_unlock(&mdev->md_io_mutex);
2757         put_ldev(mdev);
2758 }
2759
2760 /**
2761  * drbd_md_read() - Reads in the meta data super block
2762  * @mdev:       DRBD device.
2763  * @bdev:       Device from which the meta data should be read in.
2764  *
2765  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2766  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2767  */
2768 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2769 {
2770         struct meta_data_on_disk *buffer;
2771         int i, rv = NO_ERROR;
2772
2773         if (!get_ldev_if_state(mdev, D_ATTACHING))
2774                 return ERR_IO_MD_DISK;
2775
2776         mutex_lock(&mdev->md_io_mutex);
2777         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2778
2779         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2780                 /* NOTE: can't do normal error processing here as this is
2781                    called BEFORE disk is attached */
2782                 dev_err(DEV, "Error while reading metadata.\n");
2783                 rv = ERR_IO_MD_DISK;
2784                 goto err;
2785         }
2786
2787         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2788                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2789                 rv = ERR_MD_INVALID;
2790                 goto err;
2791         }
2792         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2793                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2794                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2795                 rv = ERR_MD_INVALID;
2796                 goto err;
2797         }
2798         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2799                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2800                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2801                 rv = ERR_MD_INVALID;
2802                 goto err;
2803         }
2804         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2805                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2806                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2807                 rv = ERR_MD_INVALID;
2808                 goto err;
2809         }
2810
2811         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2812                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2813                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2814                 rv = ERR_MD_INVALID;
2815                 goto err;
2816         }
2817
2818         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2819         for (i = UI_CURRENT; i < UI_SIZE; i++)
2820                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2821         bdev->md.flags = be32_to_cpu(buffer->flags);
2822         bdev->dc.al_extents = be32_to_cpu(buffer->al_nr_extents);
2823         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2824
2825         spin_lock_irq(&mdev->tconn->req_lock);
2826         if (mdev->state.conn < C_CONNECTED) {
2827                 int peer;
2828                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2829                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2830                 mdev->peer_max_bio_size = peer;
2831         }
2832         spin_unlock_irq(&mdev->tconn->req_lock);
2833
2834         if (bdev->dc.al_extents < 7)
2835                 bdev->dc.al_extents = 127;
2836
2837  err:
2838         mutex_unlock(&mdev->md_io_mutex);
2839         put_ldev(mdev);
2840
2841         return rv;
2842 }
2843
2844 /**
2845  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2846  * @mdev:       DRBD device.
2847  *
2848  * Call this function if you change anything that should be written to
2849  * the meta-data super block. This function sets MD_DIRTY, and starts a
2850  * timer that ensures that within five seconds you have to call drbd_md_sync().
2851  */
2852 #ifdef DEBUG
2853 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2854 {
2855         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2856                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2857                 mdev->last_md_mark_dirty.line = line;
2858                 mdev->last_md_mark_dirty.func = func;
2859         }
2860 }
2861 #else
2862 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2863 {
2864         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2865                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2866 }
2867 #endif
2868
2869 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2870 {
2871         int i;
2872
2873         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2874                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2875 }
2876
2877 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2878 {
2879         if (idx == UI_CURRENT) {
2880                 if (mdev->state.role == R_PRIMARY)
2881                         val |= 1;
2882                 else
2883                         val &= ~((u64)1);
2884
2885                 drbd_set_ed_uuid(mdev, val);
2886         }
2887
2888         mdev->ldev->md.uuid[idx] = val;
2889         drbd_md_mark_dirty(mdev);
2890 }
2891
2892
2893 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2894 {
2895         if (mdev->ldev->md.uuid[idx]) {
2896                 drbd_uuid_move_history(mdev);
2897                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2898         }
2899         _drbd_uuid_set(mdev, idx, val);
2900 }
2901
2902 /**
2903  * drbd_uuid_new_current() - Creates a new current UUID
2904  * @mdev:       DRBD device.
2905  *
2906  * Creates a new current UUID, and rotates the old current UUID into
2907  * the bitmap slot. Causes an incremental resync upon next connect.
2908  */
2909 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2910 {
2911         u64 val;
2912         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2913
2914         if (bm_uuid)
2915                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2916
2917         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2918
2919         get_random_bytes(&val, sizeof(u64));
2920         _drbd_uuid_set(mdev, UI_CURRENT, val);
2921         drbd_print_uuids(mdev, "new current UUID");
2922         /* get it to stable storage _now_ */
2923         drbd_md_sync(mdev);
2924 }
2925
2926 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2927 {
2928         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2929                 return;
2930
2931         if (val == 0) {
2932                 drbd_uuid_move_history(mdev);
2933                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2934                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2935         } else {
2936                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2937                 if (bm_uuid)
2938                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2939
2940                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2941         }
2942         drbd_md_mark_dirty(mdev);
2943 }
2944
2945 /**
2946  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2947  * @mdev:       DRBD device.
2948  *
2949  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2950  */
2951 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2952 {
2953         int rv = -EIO;
2954
2955         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2956                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2957                 drbd_md_sync(mdev);
2958                 drbd_bm_set_all(mdev);
2959
2960                 rv = drbd_bm_write(mdev);
2961
2962                 if (!rv) {
2963                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2964                         drbd_md_sync(mdev);
2965                 }
2966
2967                 put_ldev(mdev);
2968         }
2969
2970         return rv;
2971 }
2972
2973 /**
2974  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2975  * @mdev:       DRBD device.
2976  *
2977  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
2978  */
2979 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
2980 {
2981         int rv = -EIO;
2982
2983         drbd_resume_al(mdev);
2984         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2985                 drbd_bm_clear_all(mdev);
2986                 rv = drbd_bm_write(mdev);
2987                 put_ldev(mdev);
2988         }
2989
2990         return rv;
2991 }
2992
2993 static int w_bitmap_io(struct drbd_work *w, int unused)
2994 {
2995         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
2996         struct drbd_conf *mdev = w->mdev;
2997         int rv = -EIO;
2998
2999         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3000
3001         if (get_ldev(mdev)) {
3002                 drbd_bm_lock(mdev, work->why, work->flags);
3003                 rv = work->io_fn(mdev);
3004                 drbd_bm_unlock(mdev);
3005                 put_ldev(mdev);
3006         }
3007
3008         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3009         wake_up(&mdev->misc_wait);
3010
3011         if (work->done)
3012                 work->done(mdev, rv);
3013
3014         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3015         work->why = NULL;
3016         work->flags = 0;
3017
3018         return 0;
3019 }
3020
3021 void drbd_ldev_destroy(struct drbd_conf *mdev)
3022 {
3023         lc_destroy(mdev->resync);
3024         mdev->resync = NULL;
3025         lc_destroy(mdev->act_log);
3026         mdev->act_log = NULL;
3027         __no_warn(local,
3028                 drbd_free_bc(mdev->ldev);
3029                 mdev->ldev = NULL;);
3030
3031         clear_bit(GO_DISKLESS, &mdev->flags);
3032 }
3033
3034 static int w_go_diskless(struct drbd_work *w, int unused)
3035 {
3036         struct drbd_conf *mdev = w->mdev;
3037
3038         D_ASSERT(mdev->state.disk == D_FAILED);
3039         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3040          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3041          * the protected members anymore, though, so once put_ldev reaches zero
3042          * again, it will be safe to free them. */
3043         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3044         return 0;
3045 }
3046
3047 void drbd_go_diskless(struct drbd_conf *mdev)
3048 {
3049         D_ASSERT(mdev->state.disk == D_FAILED);
3050         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3051                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3052 }
3053
3054 /**
3055  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3056  * @mdev:       DRBD device.
3057  * @io_fn:      IO callback to be called when bitmap IO is possible
3058  * @done:       callback to be called after the bitmap IO was performed
3059  * @why:        Descriptive text of the reason for doing the IO
3060  *
3061  * While IO on the bitmap happens we freeze application IO thus we ensure
3062  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3063  * called from worker context. It MUST NOT be used while a previous such
3064  * work is still pending!
3065  */
3066 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3067                           int (*io_fn)(struct drbd_conf *),
3068                           void (*done)(struct drbd_conf *, int),
3069                           char *why, enum bm_flag flags)
3070 {
3071         D_ASSERT(current == mdev->tconn->worker.task);
3072
3073         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3074         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3075         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3076         if (mdev->bm_io_work.why)
3077                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3078                         why, mdev->bm_io_work.why);
3079
3080         mdev->bm_io_work.io_fn = io_fn;
3081         mdev->bm_io_work.done = done;
3082         mdev->bm_io_work.why = why;
3083         mdev->bm_io_work.flags = flags;
3084
3085         spin_lock_irq(&mdev->tconn->req_lock);
3086         set_bit(BITMAP_IO, &mdev->flags);
3087         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3088                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3089                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3090         }
3091         spin_unlock_irq(&mdev->tconn->req_lock);
3092 }
3093
3094 /**
3095  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3096  * @mdev:       DRBD device.
3097  * @io_fn:      IO callback to be called when bitmap IO is possible
3098  * @why:        Descriptive text of the reason for doing the IO
3099  *
3100  * freezes application IO while that the actual IO operations runs. This
3101  * functions MAY NOT be called from worker context.
3102  */
3103 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3104                 char *why, enum bm_flag flags)
3105 {
3106         int rv;
3107
3108         D_ASSERT(current != mdev->tconn->worker.task);
3109
3110         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3111                 drbd_suspend_io(mdev);
3112
3113         drbd_bm_lock(mdev, why, flags);
3114         rv = io_fn(mdev);
3115         drbd_bm_unlock(mdev);
3116
3117         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3118                 drbd_resume_io(mdev);
3119
3120         return rv;
3121 }
3122
3123 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3124 {
3125         if ((mdev->ldev->md.flags & flag) != flag) {
3126                 drbd_md_mark_dirty(mdev);
3127                 mdev->ldev->md.flags |= flag;
3128         }
3129 }
3130
3131 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3132 {
3133         if ((mdev->ldev->md.flags & flag) != 0) {
3134                 drbd_md_mark_dirty(mdev);
3135                 mdev->ldev->md.flags &= ~flag;
3136         }
3137 }
3138 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3139 {
3140         return (bdev->md.flags & flag) != 0;
3141 }
3142
3143 static void md_sync_timer_fn(unsigned long data)
3144 {
3145         struct drbd_conf *mdev = (struct drbd_conf *) data;
3146
3147         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3148 }
3149
3150 static int w_md_sync(struct drbd_work *w, int unused)
3151 {
3152         struct drbd_conf *mdev = w->mdev;
3153
3154         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3155 #ifdef DEBUG
3156         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3157                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3158 #endif
3159         drbd_md_sync(mdev);
3160         return 0;
3161 }
3162
3163 const char *cmdname(enum drbd_packet cmd)
3164 {
3165         /* THINK may need to become several global tables
3166          * when we want to support more than
3167          * one PRO_VERSION */
3168         static const char *cmdnames[] = {
3169                 [P_DATA]                = "Data",
3170                 [P_DATA_REPLY]          = "DataReply",
3171                 [P_RS_DATA_REPLY]       = "RSDataReply",
3172                 [P_BARRIER]             = "Barrier",
3173                 [P_BITMAP]              = "ReportBitMap",
3174                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3175                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3176                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3177                 [P_DATA_REQUEST]        = "DataRequest",
3178                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3179                 [P_SYNC_PARAM]          = "SyncParam",
3180                 [P_SYNC_PARAM89]        = "SyncParam89",
3181                 [P_PROTOCOL]            = "ReportProtocol",
3182                 [P_UUIDS]               = "ReportUUIDs",
3183                 [P_SIZES]               = "ReportSizes",
3184                 [P_STATE]               = "ReportState",
3185                 [P_SYNC_UUID]           = "ReportSyncUUID",
3186                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3187                 [P_AUTH_RESPONSE]       = "AuthResponse",
3188                 [P_PING]                = "Ping",
3189                 [P_PING_ACK]            = "PingAck",
3190                 [P_RECV_ACK]            = "RecvAck",
3191                 [P_WRITE_ACK]           = "WriteAck",
3192                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3193                 [P_DISCARD_WRITE]        = "DiscardWrite",
3194                 [P_NEG_ACK]             = "NegAck",
3195                 [P_NEG_DREPLY]          = "NegDReply",
3196                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3197                 [P_BARRIER_ACK]         = "BarrierAck",
3198                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3199                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3200                 [P_OV_REQUEST]          = "OVRequest",
3201                 [P_OV_REPLY]            = "OVReply",
3202                 [P_OV_RESULT]           = "OVResult",
3203                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3204                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3205                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3206                 [P_DELAY_PROBE]         = "DelayProbe",
3207                 [P_OUT_OF_SYNC]         = "OutOfSync",
3208                 [P_RETRY_WRITE]         = "RetryWrite",
3209         };
3210
3211         if (cmd == P_INITIAL_META)
3212                 return "InitialMeta";
3213         if (cmd == P_INITIAL_DATA)
3214                 return "InitialData";
3215         if (cmd == P_CONNECTION_FEATURES)
3216                 return "ConnectionFeatures";
3217         if (cmd >= ARRAY_SIZE(cmdnames))
3218                 return "Unknown";
3219         return cmdnames[cmd];
3220 }
3221
3222 /**
3223  * drbd_wait_misc  -  wait for a request to make progress
3224  * @mdev:       device associated with the request
3225  * @i:          the struct drbd_interval embedded in struct drbd_request or
3226  *              struct drbd_peer_request
3227  */
3228 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3229 {
3230         struct net_conf *net_conf = mdev->tconn->net_conf;
3231         DEFINE_WAIT(wait);
3232         long timeout;
3233
3234         if (!net_conf)
3235                 return -ETIMEDOUT;
3236         timeout = MAX_SCHEDULE_TIMEOUT;
3237         if (net_conf->ko_count)
3238                 timeout = net_conf->timeout * HZ / 10 * net_conf->ko_count;
3239
3240         /* Indicate to wake up mdev->misc_wait on progress.  */
3241         i->waiting = true;
3242         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3243         spin_unlock_irq(&mdev->tconn->req_lock);
3244         timeout = schedule_timeout(timeout);
3245         finish_wait(&mdev->misc_wait, &wait);
3246         spin_lock_irq(&mdev->tconn->req_lock);
3247         if (!timeout || mdev->state.conn < C_CONNECTED)
3248                 return -ETIMEDOUT;
3249         if (signal_pending(current))
3250                 return -ERESTARTSYS;
3251         return 0;
3252 }
3253
3254 #ifdef CONFIG_DRBD_FAULT_INJECTION
3255 /* Fault insertion support including random number generator shamelessly
3256  * stolen from kernel/rcutorture.c */
3257 struct fault_random_state {
3258         unsigned long state;
3259         unsigned long count;
3260 };
3261
3262 #define FAULT_RANDOM_MULT 39916801  /* prime */
3263 #define FAULT_RANDOM_ADD        479001701 /* prime */
3264 #define FAULT_RANDOM_REFRESH 10000
3265
3266 /*
3267  * Crude but fast random-number generator.  Uses a linear congruential
3268  * generator, with occasional help from get_random_bytes().
3269  */
3270 static unsigned long
3271 _drbd_fault_random(struct fault_random_state *rsp)
3272 {
3273         long refresh;
3274
3275         if (!rsp->count--) {
3276                 get_random_bytes(&refresh, sizeof(refresh));
3277                 rsp->state += refresh;
3278                 rsp->count = FAULT_RANDOM_REFRESH;
3279         }
3280         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3281         return swahw32(rsp->state);
3282 }
3283
3284 static char *
3285 _drbd_fault_str(unsigned int type) {
3286         static char *_faults[] = {
3287                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3288                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3289                 [DRBD_FAULT_RS_WR] = "Resync write",
3290                 [DRBD_FAULT_RS_RD] = "Resync read",
3291                 [DRBD_FAULT_DT_WR] = "Data write",
3292                 [DRBD_FAULT_DT_RD] = "Data read",
3293                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3294                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3295                 [DRBD_FAULT_AL_EE] = "EE allocation",
3296                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3297         };
3298
3299         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3300 }
3301
3302 unsigned int
3303 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3304 {
3305         static struct fault_random_state rrs = {0, 0};
3306
3307         unsigned int ret = (
3308                 (fault_devs == 0 ||
3309                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3310                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3311
3312         if (ret) {
3313                 fault_count++;
3314
3315                 if (__ratelimit(&drbd_ratelimit_state))
3316                         dev_warn(DEV, "***Simulating %s failure\n",
3317                                 _drbd_fault_str(type));
3318         }
3319
3320         return ret;
3321 }
3322 #endif
3323
3324 const char *drbd_buildtag(void)
3325 {
3326         /* DRBD built from external sources has here a reference to the
3327            git hash of the source code. */
3328
3329         static char buildtag[38] = "\0uilt-in";
3330
3331         if (buildtag[0] == 0) {
3332 #ifdef CONFIG_MODULES
3333                 if (THIS_MODULE != NULL)
3334                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3335                 else
3336 #endif
3337                         buildtag[0] = 'b';
3338         }
3339
3340         return buildtag;
3341 }
3342
3343 module_init(drbd_init)
3344 module_exit(drbd_cleanup)
3345
3346 EXPORT_SYMBOL(drbd_conn_str);
3347 EXPORT_SYMBOL(drbd_role_str);
3348 EXPORT_SYMBOL(drbd_disk_str);
3349 EXPORT_SYMBOL(drbd_set_st_err_str);