4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11 from Logicworks, Inc. for making SDP replication support possible.
13 drbd is free software; you can redistribute it and/or modify
14 it under the terms of the GNU General Public License as published by
15 the Free Software Foundation; either version 2, or (at your option)
18 drbd is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 GNU General Public License for more details.
23 You should have received a copy of the GNU General Public License
24 along with drbd; see the file COPYING. If not, write to
25 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
53 #include <linux/drbd_limits.h>
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73 "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85 * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
94 static int fault_count;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
110 int proc_details; /* Detail level in proc drbd*/
112 /* Module parameter for setting the user mode helper program
113 * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119 * as member "struct gendisk *vdisk;"
122 struct list_head drbd_tconns; /* list of struct drbd_tconn */
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache; /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache; /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache; /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
133 /* I do not use a standard mempool, because:
134 1) I want to hand out the pre-allocated objects first.
135 2) I want to be able to interrupt sleeping allocation with a signal.
136 Note: This is a single linked list, the next pointer is the private
137 member of struct page.
139 struct page *drbd_pp_pool;
140 spinlock_t drbd_pp_lock;
142 wait_queue_head_t drbd_pp_wait;
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
146 static const struct block_device_operations drbd_ops = {
147 .owner = THIS_MODULE,
149 .release = drbd_release,
152 static void bio_destructor_drbd(struct bio *bio)
154 bio_free(bio, drbd_md_io_bio_set);
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
161 if (!drbd_md_io_bio_set)
162 return bio_alloc(gfp_mask, 1);
164 bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
167 bio->bi_destructor = bio_destructor_drbd;
172 /* When checking with sparse, and this is an inline function, sparse will
173 give tons of false positives. When this is a real functions sparse works.
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
179 atomic_inc(&mdev->local_cnt);
180 io_allowed = (mdev->state.disk >= mins);
182 if (atomic_dec_and_test(&mdev->local_cnt))
183 wake_up(&mdev->misc_wait);
191 * DOC: The transfer log
193 * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194 * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195 * of the list. There is always at least one &struct drbd_tl_epoch object.
197 * Each &struct drbd_tl_epoch has a circular double linked list of requests
200 static int tl_init(struct drbd_tconn *tconn)
202 struct drbd_tl_epoch *b;
204 /* during device minor initialization, we may well use GFP_KERNEL */
205 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
208 INIT_LIST_HEAD(&b->requests);
209 INIT_LIST_HEAD(&b->w.list);
213 b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
215 tconn->oldest_tle = b;
216 tconn->newest_tle = b;
217 INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218 INIT_LIST_HEAD(&tconn->barrier_acked_requests);
223 static void tl_cleanup(struct drbd_tconn *tconn)
225 if (tconn->oldest_tle != tconn->newest_tle)
226 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227 if (!list_empty(&tconn->out_of_sequence_requests))
228 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229 kfree(tconn->oldest_tle);
230 tconn->oldest_tle = NULL;
231 kfree(tconn->unused_spare_tle);
232 tconn->unused_spare_tle = NULL;
236 * _tl_add_barrier() - Adds a barrier to the transfer log
237 * @mdev: DRBD device.
238 * @new: Barrier to be added before the current head of the TL.
240 * The caller must hold the req_lock.
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
244 struct drbd_tl_epoch *newest_before;
246 INIT_LIST_HEAD(&new->requests);
247 INIT_LIST_HEAD(&new->w.list);
248 new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
252 newest_before = tconn->newest_tle;
253 /* never send a barrier number == 0, because that is special-cased
254 * when using TCQ for our write ordering code */
255 new->br_number = (newest_before->br_number+1) ?: 1;
256 if (tconn->newest_tle != new) {
257 tconn->newest_tle->next = new;
258 tconn->newest_tle = new;
263 * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264 * @mdev: DRBD device.
265 * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266 * @set_size: Expected number of requests before that barrier.
268 * In case the passed barrier_nr or set_size does not match the oldest
269 * &struct drbd_tl_epoch objects this function will cause a termination
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273 unsigned int set_size)
275 struct drbd_conf *mdev;
276 struct drbd_tl_epoch *b, *nob; /* next old barrier */
277 struct list_head *le, *tle;
278 struct drbd_request *r;
280 spin_lock_irq(&tconn->req_lock);
282 b = tconn->oldest_tle;
284 /* first some paranoia code */
286 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
290 if (b->br_number != barrier_nr) {
291 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292 barrier_nr, b->br_number);
295 if (b->n_writes != set_size) {
296 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297 barrier_nr, set_size, b->n_writes);
301 /* Clean up list of requests processed during current epoch */
302 list_for_each_safe(le, tle, &b->requests) {
303 r = list_entry(le, struct drbd_request, tl_requests);
304 _req_mod(r, BARRIER_ACKED);
306 /* There could be requests on the list waiting for completion
307 of the write to the local disk. To avoid corruptions of
308 slab's data structures we have to remove the lists head.
310 Also there could have been a barrier ack out of sequence, overtaking
311 the write acks - which would be a bug and violating write ordering.
312 To not deadlock in case we lose connection while such requests are
313 still pending, we need some way to find them for the
314 _req_mode(CONNECTION_LOST_WHILE_PENDING).
316 These have been list_move'd to the out_of_sequence_requests list in
317 _req_mod(, BARRIER_ACKED) above.
319 list_splice_init(&b->requests, &tconn->barrier_acked_requests);
323 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324 _tl_add_barrier(tconn, b);
326 tconn->oldest_tle = nob;
327 /* if nob == NULL b was the only barrier, and becomes the new
328 barrier. Therefore tconn->oldest_tle points already to b */
330 D_ASSERT(nob != NULL);
331 tconn->oldest_tle = nob;
335 spin_unlock_irq(&tconn->req_lock);
336 dec_ap_pending(mdev);
341 spin_unlock_irq(&tconn->req_lock);
342 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
347 * _tl_restart() - Walks the transfer log, and applies an action to all requests
348 * @mdev: DRBD device.
349 * @what: The action/event to perform with all request objects
351 * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352 * RESTART_FROZEN_DISK_IO.
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
356 struct drbd_tl_epoch *b, *tmp, **pn;
357 struct list_head *le, *tle, carry_reads;
358 struct drbd_request *req;
359 int rv, n_writes, n_reads;
361 b = tconn->oldest_tle;
362 pn = &tconn->oldest_tle;
366 INIT_LIST_HEAD(&carry_reads);
367 list_for_each_safe(le, tle, &b->requests) {
368 req = list_entry(le, struct drbd_request, tl_requests);
369 rv = _req_mod(req, what);
371 n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372 n_reads += (rv & MR_READ) >> MR_READ_SHIFT;
377 if (what == RESEND) {
378 b->n_writes = n_writes;
379 if (b->w.cb == NULL) {
380 b->w.cb = w_send_barrier;
381 inc_ap_pending(b->w.mdev);
382 set_bit(CREATE_BARRIER, &b->w.mdev->flags);
385 drbd_queue_work(&tconn->data.work, &b->w);
390 list_add(&carry_reads, &b->requests);
391 /* there could still be requests on that ring list,
392 * in case local io is still pending */
393 list_del(&b->requests);
395 /* dec_ap_pending corresponding to queue_barrier.
396 * the newest barrier may not have been queued yet,
397 * in which case w.cb is still NULL. */
399 dec_ap_pending(b->w.mdev);
401 if (b == tconn->newest_tle) {
402 /* recycle, but reinit! */
404 conn_err(tconn, "ASSERT FAILED tmp == NULL");
405 INIT_LIST_HEAD(&b->requests);
406 list_splice(&carry_reads, &b->requests);
407 INIT_LIST_HEAD(&b->w.list);
409 b->br_number = net_random();
419 list_splice(&carry_reads, &b->requests);
422 /* Actions operating on the disk state, also want to work on
423 requests that got barrier acked. */
425 case FAIL_FROZEN_DISK_IO:
426 case RESTART_FROZEN_DISK_IO:
427 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
428 req = list_entry(le, struct drbd_request, tl_requests);
431 case CONNECTION_LOST_WHILE_PENDING:
435 conn_err(tconn, "what = %d in _tl_restart()\n", what);
440 * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
441 * @mdev: DRBD device.
443 * This is called after the connection to the peer was lost. The storage covered
444 * by the requests on the transfer gets marked as our of sync. Called from the
445 * receiver thread and the worker thread.
447 void tl_clear(struct drbd_tconn *tconn)
449 struct drbd_conf *mdev;
450 struct list_head *le, *tle;
451 struct drbd_request *r;
454 spin_lock_irq(&tconn->req_lock);
456 _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
458 /* we expect this list to be empty. */
459 if (!list_empty(&tconn->out_of_sequence_requests))
460 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
462 /* but just in case, clean it up anyways! */
463 list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
464 r = list_entry(le, struct drbd_request, tl_requests);
465 /* It would be nice to complete outside of spinlock.
466 * But this is easier for now. */
467 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
470 /* ensure bit indicating barrier is required is clear */
472 idr_for_each_entry(&tconn->volumes, mdev, vnr)
473 clear_bit(CREATE_BARRIER, &mdev->flags);
476 spin_unlock_irq(&tconn->req_lock);
479 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
481 spin_lock_irq(&tconn->req_lock);
482 _tl_restart(tconn, what);
483 spin_unlock_irq(&tconn->req_lock);
487 * tl_apply() - Applies an event to all requests for a certain mdev in the TL
488 * @mdev: DRBD device.
489 * @what: The action/event to perform with all request objects
491 * @what might ony be ABORT_DISK_IO.
493 void tl_apply(struct drbd_conf *mdev, enum drbd_req_event what)
495 struct drbd_tconn *tconn = mdev->tconn;
496 struct drbd_tl_epoch *b;
497 struct list_head *le, *tle;
498 struct drbd_request *req;
500 D_ASSERT(what == ABORT_DISK_IO);
502 spin_lock_irq(&tconn->req_lock);
503 b = tconn->oldest_tle;
505 list_for_each_safe(le, tle, &b->requests) {
506 req = list_entry(le, struct drbd_request, tl_requests);
507 if (req->w.mdev == mdev)
513 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
514 req = list_entry(le, struct drbd_request, tl_requests);
515 if (req->w.mdev == mdev)
519 spin_unlock_irq(&tconn->req_lock);
522 static int drbd_thread_setup(void *arg)
524 struct drbd_thread *thi = (struct drbd_thread *) arg;
525 struct drbd_tconn *tconn = thi->tconn;
529 snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
530 thi->name[0], thi->tconn->name);
533 retval = thi->function(thi);
535 spin_lock_irqsave(&thi->t_lock, flags);
537 /* if the receiver has been "EXITING", the last thing it did
538 * was set the conn state to "StandAlone",
539 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
540 * and receiver thread will be "started".
541 * drbd_thread_start needs to set "RESTARTING" in that case.
542 * t_state check and assignment needs to be within the same spinlock,
543 * so either thread_start sees EXITING, and can remap to RESTARTING,
544 * or thread_start see NONE, and can proceed as normal.
547 if (thi->t_state == RESTARTING) {
548 conn_info(tconn, "Restarting %s thread\n", thi->name);
549 thi->t_state = RUNNING;
550 spin_unlock_irqrestore(&thi->t_lock, flags);
557 complete_all(&thi->stop);
558 spin_unlock_irqrestore(&thi->t_lock, flags);
560 conn_info(tconn, "Terminating %s\n", current->comm);
562 /* Release mod reference taken when thread was started */
564 kref_put(&tconn->kref, &conn_destroy);
565 module_put(THIS_MODULE);
569 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
570 int (*func) (struct drbd_thread *), char *name)
572 spin_lock_init(&thi->t_lock);
575 thi->function = func;
577 strncpy(thi->name, name, ARRAY_SIZE(thi->name));
580 int drbd_thread_start(struct drbd_thread *thi)
582 struct drbd_tconn *tconn = thi->tconn;
583 struct task_struct *nt;
586 /* is used from state engine doing drbd_thread_stop_nowait,
587 * while holding the req lock irqsave */
588 spin_lock_irqsave(&thi->t_lock, flags);
590 switch (thi->t_state) {
592 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
593 thi->name, current->comm, current->pid);
595 /* Get ref on module for thread - this is released when thread exits */
596 if (!try_module_get(THIS_MODULE)) {
597 conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
598 spin_unlock_irqrestore(&thi->t_lock, flags);
602 kref_get(&thi->tconn->kref);
604 init_completion(&thi->stop);
605 thi->reset_cpu_mask = 1;
606 thi->t_state = RUNNING;
607 spin_unlock_irqrestore(&thi->t_lock, flags);
608 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
610 nt = kthread_create(drbd_thread_setup, (void *) thi,
611 "drbd_%c_%s", thi->name[0], thi->tconn->name);
614 conn_err(tconn, "Couldn't start thread\n");
616 kref_put(&tconn->kref, &conn_destroy);
617 module_put(THIS_MODULE);
620 spin_lock_irqsave(&thi->t_lock, flags);
622 thi->t_state = RUNNING;
623 spin_unlock_irqrestore(&thi->t_lock, flags);
627 thi->t_state = RESTARTING;
628 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
629 thi->name, current->comm, current->pid);
634 spin_unlock_irqrestore(&thi->t_lock, flags);
642 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
646 enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
648 /* may be called from state engine, holding the req lock irqsave */
649 spin_lock_irqsave(&thi->t_lock, flags);
651 if (thi->t_state == NONE) {
652 spin_unlock_irqrestore(&thi->t_lock, flags);
654 drbd_thread_start(thi);
658 if (thi->t_state != ns) {
659 if (thi->task == NULL) {
660 spin_unlock_irqrestore(&thi->t_lock, flags);
666 init_completion(&thi->stop);
667 if (thi->task != current)
668 force_sig(DRBD_SIGKILL, thi->task);
671 spin_unlock_irqrestore(&thi->t_lock, flags);
674 wait_for_completion(&thi->stop);
677 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
679 struct drbd_thread *thi =
680 task == tconn->receiver.task ? &tconn->receiver :
681 task == tconn->asender.task ? &tconn->asender :
682 task == tconn->worker.task ? &tconn->worker : NULL;
687 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
689 struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
690 return thi ? thi->name : task->comm;
693 int conn_lowest_minor(struct drbd_tconn *tconn)
695 struct drbd_conf *mdev;
699 mdev = idr_get_next(&tconn->volumes, &vnr);
700 m = mdev ? mdev_to_minor(mdev) : -1;
708 * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
709 * @mdev: DRBD device.
711 * Forces all threads of a device onto the same CPU. This is beneficial for
712 * DRBD's performance. May be overwritten by user's configuration.
714 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
719 if (cpumask_weight(tconn->cpu_mask))
722 ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
723 for_each_online_cpu(cpu) {
725 cpumask_set_cpu(cpu, tconn->cpu_mask);
729 /* should not be reached */
730 cpumask_setall(tconn->cpu_mask);
734 * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
735 * @mdev: DRBD device.
736 * @thi: drbd_thread object
738 * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
741 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
743 struct task_struct *p = current;
745 if (!thi->reset_cpu_mask)
747 thi->reset_cpu_mask = 0;
748 set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
753 * drbd_header_size - size of a packet header
755 * The header size is a multiple of 8, so any payload following the header is
756 * word aligned on 64-bit architectures. (The bitmap send and receive code
759 unsigned int drbd_header_size(struct drbd_tconn *tconn)
761 if (tconn->agreed_pro_version >= 100) {
762 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
763 return sizeof(struct p_header100);
765 BUILD_BUG_ON(sizeof(struct p_header80) !=
766 sizeof(struct p_header95));
767 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
768 return sizeof(struct p_header80);
772 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
774 h->magic = cpu_to_be32(DRBD_MAGIC);
775 h->command = cpu_to_be16(cmd);
776 h->length = cpu_to_be16(size);
777 return sizeof(struct p_header80);
780 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
782 h->magic = cpu_to_be16(DRBD_MAGIC_BIG);
783 h->command = cpu_to_be16(cmd);
784 h->length = cpu_to_be32(size);
785 return sizeof(struct p_header95);
788 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
791 h->magic = cpu_to_be32(DRBD_MAGIC_100);
792 h->volume = cpu_to_be16(vnr);
793 h->command = cpu_to_be16(cmd);
794 h->length = cpu_to_be32(size);
796 return sizeof(struct p_header100);
799 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
800 void *buffer, enum drbd_packet cmd, int size)
802 if (tconn->agreed_pro_version >= 100)
803 return prepare_header100(buffer, cmd, size, vnr);
804 else if (tconn->agreed_pro_version >= 95 &&
805 size > DRBD_MAX_SIZE_H80_PACKET)
806 return prepare_header95(buffer, cmd, size);
808 return prepare_header80(buffer, cmd, size);
811 static void *__conn_prepare_command(struct drbd_tconn *tconn,
812 struct drbd_socket *sock)
816 return sock->sbuf + drbd_header_size(tconn);
819 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
823 mutex_lock(&sock->mutex);
824 p = __conn_prepare_command(tconn, sock);
826 mutex_unlock(&sock->mutex);
831 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
833 return conn_prepare_command(mdev->tconn, sock);
836 static int __send_command(struct drbd_tconn *tconn, int vnr,
837 struct drbd_socket *sock, enum drbd_packet cmd,
838 unsigned int header_size, void *data,
845 * Called with @data == NULL and the size of the data blocks in @size
846 * for commands that send data blocks. For those commands, omit the
847 * MSG_MORE flag: this will increase the likelihood that data blocks
848 * which are page aligned on the sender will end up page aligned on the
851 msg_flags = data ? MSG_MORE : 0;
853 header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
855 err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
858 err = drbd_send_all(tconn, sock->socket, data, size, 0);
862 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
863 enum drbd_packet cmd, unsigned int header_size,
864 void *data, unsigned int size)
866 return __send_command(tconn, 0, sock, cmd, header_size, data, size);
869 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
870 enum drbd_packet cmd, unsigned int header_size,
871 void *data, unsigned int size)
875 err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
876 mutex_unlock(&sock->mutex);
880 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
881 enum drbd_packet cmd, unsigned int header_size,
882 void *data, unsigned int size)
886 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
888 mutex_unlock(&sock->mutex);
892 int drbd_send_ping(struct drbd_tconn *tconn)
894 struct drbd_socket *sock;
897 if (!conn_prepare_command(tconn, sock))
899 return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
902 int drbd_send_ping_ack(struct drbd_tconn *tconn)
904 struct drbd_socket *sock;
907 if (!conn_prepare_command(tconn, sock))
909 return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
912 int drbd_send_sync_param(struct drbd_conf *mdev)
914 struct drbd_socket *sock;
915 struct p_rs_param_95 *p;
917 const int apv = mdev->tconn->agreed_pro_version;
918 enum drbd_packet cmd;
920 struct disk_conf *dc;
922 sock = &mdev->tconn->data;
923 p = drbd_prepare_command(mdev, sock);
928 nc = rcu_dereference(mdev->tconn->net_conf);
930 size = apv <= 87 ? sizeof(struct p_rs_param)
931 : apv == 88 ? sizeof(struct p_rs_param)
932 + strlen(nc->verify_alg) + 1
933 : apv <= 94 ? sizeof(struct p_rs_param_89)
934 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
936 cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
938 /* initialize verify_alg and csums_alg */
939 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
941 if (get_ldev(mdev)) {
942 dc = rcu_dereference(mdev->ldev->disk_conf);
943 p->resync_rate = cpu_to_be32(dc->resync_rate);
944 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
945 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
946 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
947 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
950 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
951 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
952 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
953 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
954 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
958 strcpy(p->verify_alg, nc->verify_alg);
960 strcpy(p->csums_alg, nc->csums_alg);
963 return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
966 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
968 struct drbd_socket *sock;
969 struct p_protocol *p;
974 p = __conn_prepare_command(tconn, sock);
979 nc = rcu_dereference(tconn->net_conf);
981 if (nc->tentative && tconn->agreed_pro_version < 92) {
983 mutex_unlock(&sock->mutex);
984 conn_err(tconn, "--dry-run is not supported by peer");
989 if (tconn->agreed_pro_version >= 87)
990 size += strlen(nc->integrity_alg) + 1;
992 p->protocol = cpu_to_be32(nc->wire_protocol);
993 p->after_sb_0p = cpu_to_be32(nc->after_sb_0p);
994 p->after_sb_1p = cpu_to_be32(nc->after_sb_1p);
995 p->after_sb_2p = cpu_to_be32(nc->after_sb_2p);
996 p->two_primaries = cpu_to_be32(nc->two_primaries);
998 if (nc->discard_my_data)
999 cf |= CF_DISCARD_MY_DATA;
1002 p->conn_flags = cpu_to_be32(cf);
1004 if (tconn->agreed_pro_version >= 87)
1005 strcpy(p->integrity_alg, nc->integrity_alg);
1008 return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
1011 int drbd_send_protocol(struct drbd_tconn *tconn)
1015 mutex_lock(&tconn->data.mutex);
1016 err = __drbd_send_protocol(tconn, P_PROTOCOL);
1017 mutex_unlock(&tconn->data.mutex);
1022 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1024 struct drbd_socket *sock;
1028 if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1031 sock = &mdev->tconn->data;
1032 p = drbd_prepare_command(mdev, sock);
1037 for (i = UI_CURRENT; i < UI_SIZE; i++)
1038 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1040 mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1041 p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1043 uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
1045 uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1046 uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1047 p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1050 return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
1053 int drbd_send_uuids(struct drbd_conf *mdev)
1055 return _drbd_send_uuids(mdev, 0);
1058 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1060 return _drbd_send_uuids(mdev, 8);
1063 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1065 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1066 u64 *uuid = mdev->ldev->md.uuid;
1067 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1069 (unsigned long long)uuid[UI_CURRENT],
1070 (unsigned long long)uuid[UI_BITMAP],
1071 (unsigned long long)uuid[UI_HISTORY_START],
1072 (unsigned long long)uuid[UI_HISTORY_END]);
1075 dev_info(DEV, "%s effective data uuid: %016llX\n",
1077 (unsigned long long)mdev->ed_uuid);
1081 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1083 struct drbd_socket *sock;
1084 struct p_rs_uuid *p;
1087 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1089 uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1090 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1091 drbd_print_uuids(mdev, "updated sync UUID");
1094 sock = &mdev->tconn->data;
1095 p = drbd_prepare_command(mdev, sock);
1097 p->uuid = cpu_to_be64(uuid);
1098 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1102 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1104 struct drbd_socket *sock;
1106 sector_t d_size, u_size;
1107 int q_order_type, max_bio_size;
1109 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1110 D_ASSERT(mdev->ldev->backing_bdev);
1111 d_size = drbd_get_max_capacity(mdev->ldev);
1113 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1115 q_order_type = drbd_queue_order_type(mdev);
1116 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1117 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1122 q_order_type = QUEUE_ORDERED_NONE;
1123 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1126 sock = &mdev->tconn->data;
1127 p = drbd_prepare_command(mdev, sock);
1131 if (mdev->tconn->agreed_pro_version <= 94)
1132 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
1133 else if (mdev->tconn->agreed_pro_version < 100)
1134 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
1136 p->d_size = cpu_to_be64(d_size);
1137 p->u_size = cpu_to_be64(u_size);
1138 p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1139 p->max_bio_size = cpu_to_be32(max_bio_size);
1140 p->queue_order_type = cpu_to_be16(q_order_type);
1141 p->dds_flags = cpu_to_be16(flags);
1142 return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1146 * drbd_send_state() - Sends the drbd state to the peer
1147 * @mdev: DRBD device.
1149 int drbd_send_state(struct drbd_conf *mdev)
1151 struct drbd_socket *sock;
1154 sock = &mdev->tconn->data;
1155 p = drbd_prepare_command(mdev, sock);
1158 p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1159 return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1162 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1164 struct drbd_socket *sock;
1165 struct p_req_state *p;
1167 sock = &mdev->tconn->data;
1168 p = drbd_prepare_command(mdev, sock);
1171 p->mask = cpu_to_be32(mask.i);
1172 p->val = cpu_to_be32(val.i);
1173 return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1177 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1179 enum drbd_packet cmd;
1180 struct drbd_socket *sock;
1181 struct p_req_state *p;
1183 cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1184 sock = &tconn->data;
1185 p = conn_prepare_command(tconn, sock);
1188 p->mask = cpu_to_be32(mask.i);
1189 p->val = cpu_to_be32(val.i);
1190 return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1193 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1195 struct drbd_socket *sock;
1196 struct p_req_state_reply *p;
1198 sock = &mdev->tconn->meta;
1199 p = drbd_prepare_command(mdev, sock);
1201 p->retcode = cpu_to_be32(retcode);
1202 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1206 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1208 struct drbd_socket *sock;
1209 struct p_req_state_reply *p;
1210 enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1212 sock = &tconn->meta;
1213 p = conn_prepare_command(tconn, sock);
1215 p->retcode = cpu_to_be32(retcode);
1216 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1220 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1222 BUG_ON(code & ~0xf);
1223 p->encoding = (p->encoding & ~0xf) | code;
1226 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1228 p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1231 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1234 p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1237 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1238 struct p_compressed_bm *p,
1240 struct bm_xfer_ctx *c)
1242 struct bitstream bs;
1243 unsigned long plain_bits;
1250 /* may we use this feature? */
1252 use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1254 if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1257 if (c->bit_offset >= c->bm_bits)
1258 return 0; /* nothing to do. */
1260 /* use at most thus many bytes */
1261 bitstream_init(&bs, p->code, size, 0);
1262 memset(p->code, 0, size);
1263 /* plain bits covered in this code string */
1266 /* p->encoding & 0x80 stores whether the first run length is set.
1267 * bit offset is implicit.
1268 * start with toggle == 2 to be able to tell the first iteration */
1271 /* see how much plain bits we can stuff into one packet
1272 * using RLE and VLI. */
1274 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1275 : _drbd_bm_find_next(mdev, c->bit_offset);
1278 rl = tmp - c->bit_offset;
1280 if (toggle == 2) { /* first iteration */
1282 /* the first checked bit was set,
1283 * store start value, */
1284 dcbp_set_start(p, 1);
1285 /* but skip encoding of zero run length */
1289 dcbp_set_start(p, 0);
1292 /* paranoia: catch zero runlength.
1293 * can only happen if bitmap is modified while we scan it. */
1295 dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1296 "t:%u bo:%lu\n", toggle, c->bit_offset);
1300 bits = vli_encode_bits(&bs, rl);
1301 if (bits == -ENOBUFS) /* buffer full */
1304 dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1310 c->bit_offset = tmp;
1311 } while (c->bit_offset < c->bm_bits);
1313 len = bs.cur.b - p->code + !!bs.cur.bit;
1315 if (plain_bits < (len << 3)) {
1316 /* incompressible with this method.
1317 * we need to rewind both word and bit position. */
1318 c->bit_offset -= plain_bits;
1319 bm_xfer_ctx_bit_to_word_offset(c);
1320 c->bit_offset = c->word_offset * BITS_PER_LONG;
1324 /* RLE + VLI was able to compress it just fine.
1325 * update c->word_offset. */
1326 bm_xfer_ctx_bit_to_word_offset(c);
1328 /* store pad_bits */
1329 dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1335 * send_bitmap_rle_or_plain
1337 * Return 0 when done, 1 when another iteration is needed, and a negative error
1338 * code upon failure.
1341 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1343 struct drbd_socket *sock = &mdev->tconn->data;
1344 unsigned int header_size = drbd_header_size(mdev->tconn);
1345 struct p_compressed_bm *p = sock->sbuf + header_size;
1348 len = fill_bitmap_rle_bits(mdev, p,
1349 DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1354 dcbp_set_code(p, RLE_VLI_Bits);
1355 err = __send_command(mdev->tconn, mdev->vnr, sock,
1356 P_COMPRESSED_BITMAP, sizeof(*p) + len,
1359 c->bytes[0] += header_size + sizeof(*p) + len;
1361 if (c->bit_offset >= c->bm_bits)
1364 /* was not compressible.
1365 * send a buffer full of plain text bits instead. */
1366 unsigned int data_size;
1367 unsigned long num_words;
1368 unsigned long *p = sock->sbuf + header_size;
1370 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1371 num_words = min_t(size_t, data_size / sizeof(*p),
1372 c->bm_words - c->word_offset);
1373 len = num_words * sizeof(*p);
1375 drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1376 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1377 c->word_offset += num_words;
1378 c->bit_offset = c->word_offset * BITS_PER_LONG;
1381 c->bytes[1] += header_size + len;
1383 if (c->bit_offset > c->bm_bits)
1384 c->bit_offset = c->bm_bits;
1388 INFO_bm_xfer_stats(mdev, "send", c);
1396 /* See the comment at receive_bitmap() */
1397 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1399 struct bm_xfer_ctx c;
1402 if (!expect(mdev->bitmap))
1405 if (get_ldev(mdev)) {
1406 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1407 dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1408 drbd_bm_set_all(mdev);
1409 if (drbd_bm_write(mdev)) {
1410 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1411 * but otherwise process as per normal - need to tell other
1412 * side that a full resync is required! */
1413 dev_err(DEV, "Failed to write bitmap to disk!\n");
1415 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1422 c = (struct bm_xfer_ctx) {
1423 .bm_bits = drbd_bm_bits(mdev),
1424 .bm_words = drbd_bm_words(mdev),
1428 err = send_bitmap_rle_or_plain(mdev, &c);
1434 int drbd_send_bitmap(struct drbd_conf *mdev)
1436 struct drbd_socket *sock = &mdev->tconn->data;
1439 mutex_lock(&sock->mutex);
1441 err = !_drbd_send_bitmap(mdev);
1442 mutex_unlock(&sock->mutex);
1446 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1448 struct drbd_socket *sock;
1449 struct p_barrier_ack *p;
1451 if (mdev->state.conn < C_CONNECTED)
1454 sock = &mdev->tconn->meta;
1455 p = drbd_prepare_command(mdev, sock);
1458 p->barrier = barrier_nr;
1459 p->set_size = cpu_to_be32(set_size);
1460 drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1464 * _drbd_send_ack() - Sends an ack packet
1465 * @mdev: DRBD device.
1466 * @cmd: Packet command code.
1467 * @sector: sector, needs to be in big endian byte order
1468 * @blksize: size in byte, needs to be in big endian byte order
1469 * @block_id: Id, big endian byte order
1471 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1472 u64 sector, u32 blksize, u64 block_id)
1474 struct drbd_socket *sock;
1475 struct p_block_ack *p;
1477 if (mdev->state.conn < C_CONNECTED)
1480 sock = &mdev->tconn->meta;
1481 p = drbd_prepare_command(mdev, sock);
1485 p->block_id = block_id;
1486 p->blksize = blksize;
1487 p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1488 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1491 /* dp->sector and dp->block_id already/still in network byte order,
1492 * data_size is payload size according to dp->head,
1493 * and may need to be corrected for digest size. */
1494 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1495 struct p_data *dp, int data_size)
1497 if (mdev->tconn->peer_integrity_tfm)
1498 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1499 _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1503 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1504 struct p_block_req *rp)
1506 _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1510 * drbd_send_ack() - Sends an ack packet
1511 * @mdev: DRBD device
1512 * @cmd: packet command code
1513 * @peer_req: peer request
1515 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1516 struct drbd_peer_request *peer_req)
1518 return _drbd_send_ack(mdev, cmd,
1519 cpu_to_be64(peer_req->i.sector),
1520 cpu_to_be32(peer_req->i.size),
1521 peer_req->block_id);
1524 /* This function misuses the block_id field to signal if the blocks
1525 * are is sync or not. */
1526 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1527 sector_t sector, int blksize, u64 block_id)
1529 return _drbd_send_ack(mdev, cmd,
1530 cpu_to_be64(sector),
1531 cpu_to_be32(blksize),
1532 cpu_to_be64(block_id));
1535 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1536 sector_t sector, int size, u64 block_id)
1538 struct drbd_socket *sock;
1539 struct p_block_req *p;
1541 sock = &mdev->tconn->data;
1542 p = drbd_prepare_command(mdev, sock);
1545 p->sector = cpu_to_be64(sector);
1546 p->block_id = block_id;
1547 p->blksize = cpu_to_be32(size);
1548 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1551 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1552 void *digest, int digest_size, enum drbd_packet cmd)
1554 struct drbd_socket *sock;
1555 struct p_block_req *p;
1557 /* FIXME: Put the digest into the preallocated socket buffer. */
1559 sock = &mdev->tconn->data;
1560 p = drbd_prepare_command(mdev, sock);
1563 p->sector = cpu_to_be64(sector);
1564 p->block_id = ID_SYNCER /* unused */;
1565 p->blksize = cpu_to_be32(size);
1566 return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1567 digest, digest_size);
1570 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1572 struct drbd_socket *sock;
1573 struct p_block_req *p;
1575 sock = &mdev->tconn->data;
1576 p = drbd_prepare_command(mdev, sock);
1579 p->sector = cpu_to_be64(sector);
1580 p->block_id = ID_SYNCER /* unused */;
1581 p->blksize = cpu_to_be32(size);
1582 return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1585 /* called on sndtimeo
1586 * returns false if we should retry,
1587 * true if we think connection is dead
1589 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1592 /* long elapsed = (long)(jiffies - mdev->last_received); */
1594 drop_it = tconn->meta.socket == sock
1595 || !tconn->asender.task
1596 || get_t_state(&tconn->asender) != RUNNING
1597 || tconn->cstate < C_WF_REPORT_PARAMS;
1602 drop_it = !--tconn->ko_count;
1604 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1605 current->comm, current->pid, tconn->ko_count);
1606 request_ping(tconn);
1609 return drop_it; /* && (mdev->state == R_PRIMARY) */;
1612 static void drbd_update_congested(struct drbd_tconn *tconn)
1614 struct sock *sk = tconn->data.socket->sk;
1615 if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1616 set_bit(NET_CONGESTED, &tconn->flags);
1619 /* The idea of sendpage seems to be to put some kind of reference
1620 * to the page into the skb, and to hand it over to the NIC. In
1621 * this process get_page() gets called.
1623 * As soon as the page was really sent over the network put_page()
1624 * gets called by some part of the network layer. [ NIC driver? ]
1626 * [ get_page() / put_page() increment/decrement the count. If count
1627 * reaches 0 the page will be freed. ]
1629 * This works nicely with pages from FSs.
1630 * But this means that in protocol A we might signal IO completion too early!
1632 * In order not to corrupt data during a resync we must make sure
1633 * that we do not reuse our own buffer pages (EEs) to early, therefore
1634 * we have the net_ee list.
1636 * XFS seems to have problems, still, it submits pages with page_count == 0!
1637 * As a workaround, we disable sendpage on pages
1638 * with page_count == 0 or PageSlab.
1640 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1641 int offset, size_t size, unsigned msg_flags)
1643 struct socket *socket;
1647 socket = mdev->tconn->data.socket;
1648 addr = kmap(page) + offset;
1649 err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1652 mdev->send_cnt += size >> 9;
1656 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1657 int offset, size_t size, unsigned msg_flags)
1659 struct socket *socket = mdev->tconn->data.socket;
1660 mm_segment_t oldfs = get_fs();
1664 /* e.g. XFS meta- & log-data is in slab pages, which have a
1665 * page_count of 0 and/or have PageSlab() set.
1666 * we cannot use send_page for those, as that does get_page();
1667 * put_page(); and would cause either a VM_BUG directly, or
1668 * __page_cache_release a page that would actually still be referenced
1669 * by someone, leading to some obscure delayed Oops somewhere else. */
1670 if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1671 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1673 msg_flags |= MSG_NOSIGNAL;
1674 drbd_update_congested(mdev->tconn);
1679 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1681 if (sent == -EAGAIN) {
1682 if (we_should_drop_the_connection(mdev->tconn, socket))
1686 dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1687 __func__, (int)size, len, sent);
1694 } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1696 clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1700 mdev->send_cnt += size >> 9;
1705 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1707 struct bio_vec *bvec;
1709 /* hint all but last page with MSG_MORE */
1710 __bio_for_each_segment(bvec, bio, i, 0) {
1713 err = _drbd_no_send_page(mdev, bvec->bv_page,
1714 bvec->bv_offset, bvec->bv_len,
1715 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1722 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1724 struct bio_vec *bvec;
1726 /* hint all but last page with MSG_MORE */
1727 __bio_for_each_segment(bvec, bio, i, 0) {
1730 err = _drbd_send_page(mdev, bvec->bv_page,
1731 bvec->bv_offset, bvec->bv_len,
1732 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1739 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1740 struct drbd_peer_request *peer_req)
1742 struct page *page = peer_req->pages;
1743 unsigned len = peer_req->i.size;
1746 /* hint all but last page with MSG_MORE */
1747 page_chain_for_each(page) {
1748 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1750 err = _drbd_send_page(mdev, page, 0, l,
1751 page_chain_next(page) ? MSG_MORE : 0);
1759 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1761 if (mdev->tconn->agreed_pro_version >= 95)
1762 return (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1763 (bi_rw & REQ_FUA ? DP_FUA : 0) |
1764 (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1765 (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1767 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1770 /* Used to send write requests
1771 * R_PRIMARY -> Peer (P_DATA)
1773 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1775 struct drbd_socket *sock;
1777 unsigned int dp_flags = 0;
1781 sock = &mdev->tconn->data;
1782 p = drbd_prepare_command(mdev, sock);
1783 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1784 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1788 p->sector = cpu_to_be64(req->i.sector);
1789 p->block_id = (unsigned long)req;
1790 p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1791 dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1792 if (mdev->state.conn >= C_SYNC_SOURCE &&
1793 mdev->state.conn <= C_PAUSED_SYNC_T)
1794 dp_flags |= DP_MAY_SET_IN_SYNC;
1795 if (mdev->tconn->agreed_pro_version >= 100) {
1796 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1797 dp_flags |= DP_SEND_RECEIVE_ACK;
1798 if (req->rq_state & RQ_EXP_WRITE_ACK)
1799 dp_flags |= DP_SEND_WRITE_ACK;
1801 p->dp_flags = cpu_to_be32(dp_flags);
1803 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1804 err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1806 /* For protocol A, we have to memcpy the payload into
1807 * socket buffers, as we may complete right away
1808 * as soon as we handed it over to tcp, at which point the data
1809 * pages may become invalid.
1811 * For data-integrity enabled, we copy it as well, so we can be
1812 * sure that even if the bio pages may still be modified, it
1813 * won't change the data on the wire, thus if the digest checks
1814 * out ok after sending on this side, but does not fit on the
1815 * receiving side, we sure have detected corruption elsewhere.
1817 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1818 err = _drbd_send_bio(mdev, req->master_bio);
1820 err = _drbd_send_zc_bio(mdev, req->master_bio);
1822 /* double check digest, sometimes buffers have been modified in flight. */
1823 if (dgs > 0 && dgs <= 64) {
1824 /* 64 byte, 512 bit, is the largest digest size
1825 * currently supported in kernel crypto. */
1826 unsigned char digest[64];
1827 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1828 if (memcmp(p + 1, digest, dgs)) {
1830 "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1831 (unsigned long long)req->i.sector, req->i.size);
1833 } /* else if (dgs > 64) {
1834 ... Be noisy about digest too large ...
1837 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1842 /* answer packet, used to send data back for read requests:
1843 * Peer -> (diskless) R_PRIMARY (P_DATA_REPLY)
1844 * C_SYNC_SOURCE -> C_SYNC_TARGET (P_RS_DATA_REPLY)
1846 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1847 struct drbd_peer_request *peer_req)
1849 struct drbd_socket *sock;
1854 sock = &mdev->tconn->data;
1855 p = drbd_prepare_command(mdev, sock);
1857 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1858 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1862 p->sector = cpu_to_be64(peer_req->i.sector);
1863 p->block_id = peer_req->block_id;
1864 p->seq_num = 0; /* unused */
1866 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1867 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1869 err = _drbd_send_zc_ee(mdev, peer_req);
1870 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1875 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1877 struct drbd_socket *sock;
1878 struct p_block_desc *p;
1880 sock = &mdev->tconn->data;
1881 p = drbd_prepare_command(mdev, sock);
1884 p->sector = cpu_to_be64(req->i.sector);
1885 p->blksize = cpu_to_be32(req->i.size);
1886 return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1890 drbd_send distinguishes two cases:
1892 Packets sent via the data socket "sock"
1893 and packets sent via the meta data socket "msock"
1896 -----------------+-------------------------+------------------------------
1897 timeout conf.timeout / 2 conf.timeout / 2
1898 timeout action send a ping via msock Abort communication
1899 and close all sockets
1903 * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1905 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1906 void *buf, size_t size, unsigned msg_flags)
1915 /* THINK if (signal_pending) return ... ? */
1920 msg.msg_name = NULL;
1921 msg.msg_namelen = 0;
1922 msg.msg_control = NULL;
1923 msg.msg_controllen = 0;
1924 msg.msg_flags = msg_flags | MSG_NOSIGNAL;
1926 if (sock == tconn->data.socket) {
1928 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1930 drbd_update_congested(tconn);
1934 * tcp_sendmsg does _not_ use its size parameter at all ?
1936 * -EAGAIN on timeout, -EINTR on signal.
1939 * do we need to block DRBD_SIG if sock == &meta.socket ??
1940 * otherwise wake_asender() might interrupt some send_*Ack !
1942 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1943 if (rv == -EAGAIN) {
1944 if (we_should_drop_the_connection(tconn, sock))
1950 flush_signals(current);
1958 } while (sent < size);
1960 if (sock == tconn->data.socket)
1961 clear_bit(NET_CONGESTED, &tconn->flags);
1964 if (rv != -EAGAIN) {
1965 conn_err(tconn, "%s_sendmsg returned %d\n",
1966 sock == tconn->meta.socket ? "msock" : "sock",
1968 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1970 conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1977 * drbd_send_all - Send an entire buffer
1979 * Returns 0 upon success and a negative error value otherwise.
1981 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1982 size_t size, unsigned msg_flags)
1986 err = drbd_send(tconn, sock, buffer, size, msg_flags);
1994 static int drbd_open(struct block_device *bdev, fmode_t mode)
1996 struct drbd_conf *mdev = bdev->bd_disk->private_data;
1997 unsigned long flags;
2000 mutex_lock(&drbd_main_mutex);
2001 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
2002 /* to have a stable mdev->state.role
2003 * and no race with updating open_cnt */
2005 if (mdev->state.role != R_PRIMARY) {
2006 if (mode & FMODE_WRITE)
2008 else if (!allow_oos)
2014 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
2015 mutex_unlock(&drbd_main_mutex);
2020 static int drbd_release(struct gendisk *gd, fmode_t mode)
2022 struct drbd_conf *mdev = gd->private_data;
2023 mutex_lock(&drbd_main_mutex);
2025 mutex_unlock(&drbd_main_mutex);
2029 static void drbd_set_defaults(struct drbd_conf *mdev)
2031 /* Beware! The actual layout differs
2032 * between big endian and little endian */
2033 mdev->state = (union drbd_dev_state) {
2034 { .role = R_SECONDARY,
2036 .conn = C_STANDALONE,
2042 void drbd_init_set_defaults(struct drbd_conf *mdev)
2044 /* the memset(,0,) did most of this.
2045 * note: only assignments, no allocation in here */
2047 drbd_set_defaults(mdev);
2049 atomic_set(&mdev->ap_bio_cnt, 0);
2050 atomic_set(&mdev->ap_pending_cnt, 0);
2051 atomic_set(&mdev->rs_pending_cnt, 0);
2052 atomic_set(&mdev->unacked_cnt, 0);
2053 atomic_set(&mdev->local_cnt, 0);
2054 atomic_set(&mdev->pp_in_use_by_net, 0);
2055 atomic_set(&mdev->rs_sect_in, 0);
2056 atomic_set(&mdev->rs_sect_ev, 0);
2057 atomic_set(&mdev->ap_in_flight, 0);
2058 atomic_set(&mdev->md_io_in_use, 0);
2060 mutex_init(&mdev->own_state_mutex);
2061 mdev->state_mutex = &mdev->own_state_mutex;
2063 spin_lock_init(&mdev->al_lock);
2064 spin_lock_init(&mdev->peer_seq_lock);
2065 spin_lock_init(&mdev->epoch_lock);
2067 INIT_LIST_HEAD(&mdev->active_ee);
2068 INIT_LIST_HEAD(&mdev->sync_ee);
2069 INIT_LIST_HEAD(&mdev->done_ee);
2070 INIT_LIST_HEAD(&mdev->read_ee);
2071 INIT_LIST_HEAD(&mdev->net_ee);
2072 INIT_LIST_HEAD(&mdev->resync_reads);
2073 INIT_LIST_HEAD(&mdev->resync_work.list);
2074 INIT_LIST_HEAD(&mdev->unplug_work.list);
2075 INIT_LIST_HEAD(&mdev->go_diskless.list);
2076 INIT_LIST_HEAD(&mdev->md_sync_work.list);
2077 INIT_LIST_HEAD(&mdev->start_resync_work.list);
2078 INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2080 mdev->resync_work.cb = w_resync_timer;
2081 mdev->unplug_work.cb = w_send_write_hint;
2082 mdev->go_diskless.cb = w_go_diskless;
2083 mdev->md_sync_work.cb = w_md_sync;
2084 mdev->bm_io_work.w.cb = w_bitmap_io;
2085 mdev->start_resync_work.cb = w_start_resync;
2087 mdev->resync_work.mdev = mdev;
2088 mdev->unplug_work.mdev = mdev;
2089 mdev->go_diskless.mdev = mdev;
2090 mdev->md_sync_work.mdev = mdev;
2091 mdev->bm_io_work.w.mdev = mdev;
2092 mdev->start_resync_work.mdev = mdev;
2094 init_timer(&mdev->resync_timer);
2095 init_timer(&mdev->md_sync_timer);
2096 init_timer(&mdev->start_resync_timer);
2097 init_timer(&mdev->request_timer);
2098 mdev->resync_timer.function = resync_timer_fn;
2099 mdev->resync_timer.data = (unsigned long) mdev;
2100 mdev->md_sync_timer.function = md_sync_timer_fn;
2101 mdev->md_sync_timer.data = (unsigned long) mdev;
2102 mdev->start_resync_timer.function = start_resync_timer_fn;
2103 mdev->start_resync_timer.data = (unsigned long) mdev;
2104 mdev->request_timer.function = request_timer_fn;
2105 mdev->request_timer.data = (unsigned long) mdev;
2107 init_waitqueue_head(&mdev->misc_wait);
2108 init_waitqueue_head(&mdev->state_wait);
2109 init_waitqueue_head(&mdev->ee_wait);
2110 init_waitqueue_head(&mdev->al_wait);
2111 init_waitqueue_head(&mdev->seq_wait);
2113 mdev->write_ordering = WO_bdev_flush;
2114 mdev->resync_wenr = LC_FREE;
2115 mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2116 mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2119 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2122 if (mdev->tconn->receiver.t_state != NONE)
2123 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2124 mdev->tconn->receiver.t_state);
2126 /* no need to lock it, I'm the only thread alive */
2127 if (atomic_read(&mdev->current_epoch->epoch_size) != 0)
2128 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2138 mdev->rs_failed = 0;
2139 mdev->rs_last_events = 0;
2140 mdev->rs_last_sect_ev = 0;
2141 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2142 mdev->rs_mark_left[i] = 0;
2143 mdev->rs_mark_time[i] = 0;
2145 D_ASSERT(mdev->tconn->net_conf == NULL);
2147 drbd_set_my_capacity(mdev, 0);
2149 /* maybe never allocated. */
2150 drbd_bm_resize(mdev, 0, 1);
2151 drbd_bm_cleanup(mdev);
2154 drbd_free_bc(mdev->ldev);
2157 clear_bit(AL_SUSPENDED, &mdev->flags);
2159 D_ASSERT(list_empty(&mdev->active_ee));
2160 D_ASSERT(list_empty(&mdev->sync_ee));
2161 D_ASSERT(list_empty(&mdev->done_ee));
2162 D_ASSERT(list_empty(&mdev->read_ee));
2163 D_ASSERT(list_empty(&mdev->net_ee));
2164 D_ASSERT(list_empty(&mdev->resync_reads));
2165 D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2166 D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2167 D_ASSERT(list_empty(&mdev->resync_work.list));
2168 D_ASSERT(list_empty(&mdev->unplug_work.list));
2169 D_ASSERT(list_empty(&mdev->go_diskless.list));
2171 drbd_set_defaults(mdev);
2175 static void drbd_destroy_mempools(void)
2179 while (drbd_pp_pool) {
2180 page = drbd_pp_pool;
2181 drbd_pp_pool = (struct page *)page_private(page);
2186 /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2188 if (drbd_md_io_bio_set)
2189 bioset_free(drbd_md_io_bio_set);
2190 if (drbd_md_io_page_pool)
2191 mempool_destroy(drbd_md_io_page_pool);
2192 if (drbd_ee_mempool)
2193 mempool_destroy(drbd_ee_mempool);
2194 if (drbd_request_mempool)
2195 mempool_destroy(drbd_request_mempool);
2197 kmem_cache_destroy(drbd_ee_cache);
2198 if (drbd_request_cache)
2199 kmem_cache_destroy(drbd_request_cache);
2200 if (drbd_bm_ext_cache)
2201 kmem_cache_destroy(drbd_bm_ext_cache);
2202 if (drbd_al_ext_cache)
2203 kmem_cache_destroy(drbd_al_ext_cache);
2205 drbd_md_io_bio_set = NULL;
2206 drbd_md_io_page_pool = NULL;
2207 drbd_ee_mempool = NULL;
2208 drbd_request_mempool = NULL;
2209 drbd_ee_cache = NULL;
2210 drbd_request_cache = NULL;
2211 drbd_bm_ext_cache = NULL;
2212 drbd_al_ext_cache = NULL;
2217 static int drbd_create_mempools(void)
2220 const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2223 /* prepare our caches and mempools */
2224 drbd_request_mempool = NULL;
2225 drbd_ee_cache = NULL;
2226 drbd_request_cache = NULL;
2227 drbd_bm_ext_cache = NULL;
2228 drbd_al_ext_cache = NULL;
2229 drbd_pp_pool = NULL;
2230 drbd_md_io_page_pool = NULL;
2231 drbd_md_io_bio_set = NULL;
2234 drbd_request_cache = kmem_cache_create(
2235 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2236 if (drbd_request_cache == NULL)
2239 drbd_ee_cache = kmem_cache_create(
2240 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2241 if (drbd_ee_cache == NULL)
2244 drbd_bm_ext_cache = kmem_cache_create(
2245 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2246 if (drbd_bm_ext_cache == NULL)
2249 drbd_al_ext_cache = kmem_cache_create(
2250 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2251 if (drbd_al_ext_cache == NULL)
2255 drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2256 if (drbd_md_io_bio_set == NULL)
2259 drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2260 if (drbd_md_io_page_pool == NULL)
2263 drbd_request_mempool = mempool_create(number,
2264 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2265 if (drbd_request_mempool == NULL)
2268 drbd_ee_mempool = mempool_create(number,
2269 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2270 if (drbd_ee_mempool == NULL)
2273 /* drbd's page pool */
2274 spin_lock_init(&drbd_pp_lock);
2276 for (i = 0; i < number; i++) {
2277 page = alloc_page(GFP_HIGHUSER);
2280 set_page_private(page, (unsigned long)drbd_pp_pool);
2281 drbd_pp_pool = page;
2283 drbd_pp_vacant = number;
2288 drbd_destroy_mempools(); /* in case we allocated some */
2292 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2295 /* just so we have it. you never know what interesting things we
2296 * might want to do here some day...
2302 static struct notifier_block drbd_notifier = {
2303 .notifier_call = drbd_notify_sys,
2306 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2310 rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2312 dev_err(DEV, "%d EEs in active list found!\n", rr);
2314 rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2316 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2318 rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2320 dev_err(DEV, "%d EEs in read list found!\n", rr);
2322 rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2324 dev_err(DEV, "%d EEs in done list found!\n", rr);
2326 rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2328 dev_err(DEV, "%d EEs in net list found!\n", rr);
2331 /* caution. no locking. */
2332 void drbd_minor_destroy(struct kref *kref)
2334 struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2335 struct drbd_tconn *tconn = mdev->tconn;
2337 del_timer_sync(&mdev->request_timer);
2339 /* paranoia asserts */
2340 D_ASSERT(mdev->open_cnt == 0);
2341 D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2342 /* end paranoia asserts */
2344 /* cleanup stuff that may have been allocated during
2345 * device (re-)configuration or state changes */
2347 if (mdev->this_bdev)
2348 bdput(mdev->this_bdev);
2350 drbd_free_bc(mdev->ldev);
2353 drbd_release_all_peer_reqs(mdev);
2355 lc_destroy(mdev->act_log);
2356 lc_destroy(mdev->resync);
2358 kfree(mdev->p_uuid);
2359 /* mdev->p_uuid = NULL; */
2361 kfree(mdev->current_epoch);
2362 if (mdev->bitmap) /* should no longer be there. */
2363 drbd_bm_cleanup(mdev);
2364 __free_page(mdev->md_io_page);
2365 put_disk(mdev->vdisk);
2366 blk_cleanup_queue(mdev->rq_queue);
2367 kfree(mdev->rs_plan_s);
2370 kref_put(&tconn->kref, &conn_destroy);
2373 static void drbd_cleanup(void)
2376 struct drbd_conf *mdev;
2377 struct drbd_tconn *tconn, *tmp;
2379 unregister_reboot_notifier(&drbd_notifier);
2381 /* first remove proc,
2382 * drbdsetup uses it's presence to detect
2383 * whether DRBD is loaded.
2384 * If we would get stuck in proc removal,
2385 * but have netlink already deregistered,
2386 * some drbdsetup commands may wait forever
2390 remove_proc_entry("drbd", NULL);
2392 drbd_genl_unregister();
2394 idr_for_each_entry(&minors, mdev, i) {
2395 idr_remove(&minors, mdev_to_minor(mdev));
2396 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2397 del_gendisk(mdev->vdisk);
2398 /* synchronize_rcu(); No other threads running at this point */
2399 kref_put(&mdev->kref, &drbd_minor_destroy);
2402 /* not _rcu since, no other updater anymore. Genl already unregistered */
2403 list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2404 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2405 /* synchronize_rcu(); */
2406 kref_put(&tconn->kref, &conn_destroy);
2409 drbd_destroy_mempools();
2410 unregister_blkdev(DRBD_MAJOR, "drbd");
2412 idr_destroy(&minors);
2414 printk(KERN_INFO "drbd: module cleanup done.\n");
2418 * drbd_congested() - Callback for pdflush
2419 * @congested_data: User data
2420 * @bdi_bits: Bits pdflush is currently interested in
2422 * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2424 static int drbd_congested(void *congested_data, int bdi_bits)
2426 struct drbd_conf *mdev = congested_data;
2427 struct request_queue *q;
2431 if (!may_inc_ap_bio(mdev)) {
2432 /* DRBD has frozen IO */
2438 if (get_ldev(mdev)) {
2439 q = bdev_get_queue(mdev->ldev->backing_bdev);
2440 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2446 if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2447 r |= (1 << BDI_async_congested);
2448 reason = reason == 'b' ? 'a' : 'n';
2452 mdev->congestion_reason = reason;
2456 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2458 sema_init(&wq->s, 0);
2459 spin_lock_init(&wq->q_lock);
2460 INIT_LIST_HEAD(&wq->q);
2463 struct drbd_tconn *conn_get_by_name(const char *name)
2465 struct drbd_tconn *tconn;
2467 if (!name || !name[0])
2471 list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2472 if (!strcmp(tconn->name, name)) {
2473 kref_get(&tconn->kref);
2483 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2484 void *peer_addr, int peer_addr_len)
2486 struct drbd_tconn *tconn;
2489 list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2490 if (tconn->my_addr_len == my_addr_len &&
2491 tconn->peer_addr_len == peer_addr_len &&
2492 !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2493 !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2494 kref_get(&tconn->kref);
2504 static int drbd_alloc_socket(struct drbd_socket *socket)
2506 socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2509 socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2515 static void drbd_free_socket(struct drbd_socket *socket)
2517 free_page((unsigned long) socket->sbuf);
2518 free_page((unsigned long) socket->rbuf);
2521 void conn_free_crypto(struct drbd_tconn *tconn)
2523 drbd_free_sock(tconn);
2525 crypto_free_hash(tconn->csums_tfm);
2526 crypto_free_hash(tconn->verify_tfm);
2527 crypto_free_hash(tconn->cram_hmac_tfm);
2528 crypto_free_hash(tconn->integrity_tfm);
2529 crypto_free_hash(tconn->peer_integrity_tfm);
2530 kfree(tconn->int_dig_in);
2531 kfree(tconn->int_dig_vv);
2533 tconn->csums_tfm = NULL;
2534 tconn->verify_tfm = NULL;
2535 tconn->cram_hmac_tfm = NULL;
2536 tconn->integrity_tfm = NULL;
2537 tconn->peer_integrity_tfm = NULL;
2538 tconn->int_dig_in = NULL;
2539 tconn->int_dig_vv = NULL;
2542 int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
2544 cpumask_var_t new_cpu_mask;
2547 if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2550 retcode = ERR_NOMEM;
2551 drbd_msg_put_info("unable to allocate cpumask");
2554 /* silently ignore cpu mask on UP kernel */
2555 if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2556 /* FIXME: Get rid of constant 32 here */
2557 err = __bitmap_parse(res_opts->cpu_mask, 32, 0,
2558 cpumask_bits(new_cpu_mask), nr_cpu_ids);
2560 conn_warn(tconn, "__bitmap_parse() failed with %d\n", err);
2561 /* retcode = ERR_CPU_MASK_PARSE; */
2565 tconn->res_opts = *res_opts;
2566 if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
2567 cpumask_copy(tconn->cpu_mask, new_cpu_mask);
2568 drbd_calc_cpu_mask(tconn);
2569 tconn->receiver.reset_cpu_mask = 1;
2570 tconn->asender.reset_cpu_mask = 1;
2571 tconn->worker.reset_cpu_mask = 1;
2576 free_cpumask_var(new_cpu_mask);
2581 /* caller must be under genl_lock() */
2582 struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2584 struct drbd_tconn *tconn;
2586 tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2590 tconn->name = kstrdup(name, GFP_KERNEL);
2594 if (drbd_alloc_socket(&tconn->data))
2596 if (drbd_alloc_socket(&tconn->meta))
2599 if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2602 if (set_resource_options(tconn, res_opts))
2605 if (!tl_init(tconn))
2608 tconn->cstate = C_STANDALONE;
2609 mutex_init(&tconn->cstate_mutex);
2610 spin_lock_init(&tconn->req_lock);
2611 mutex_init(&tconn->conf_update);
2612 init_waitqueue_head(&tconn->ping_wait);
2613 idr_init(&tconn->volumes);
2615 drbd_init_workqueue(&tconn->data.work);
2616 mutex_init(&tconn->data.mutex);
2618 drbd_init_workqueue(&tconn->meta.work);
2619 mutex_init(&tconn->meta.mutex);
2621 drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2622 drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2623 drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2625 kref_init(&tconn->kref);
2626 list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2632 free_cpumask_var(tconn->cpu_mask);
2633 drbd_free_socket(&tconn->meta);
2634 drbd_free_socket(&tconn->data);
2641 void conn_destroy(struct kref *kref)
2643 struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2645 idr_destroy(&tconn->volumes);
2647 free_cpumask_var(tconn->cpu_mask);
2648 drbd_free_socket(&tconn->meta);
2649 drbd_free_socket(&tconn->data);
2651 kfree(tconn->int_dig_in);
2652 kfree(tconn->int_dig_vv);
2656 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2658 struct drbd_conf *mdev;
2659 struct gendisk *disk;
2660 struct request_queue *q;
2662 int minor_got = minor;
2663 enum drbd_ret_code err = ERR_NOMEM;
2665 mdev = minor_to_mdev(minor);
2667 return ERR_MINOR_EXISTS;
2669 /* GFP_KERNEL, we are outside of all write-out paths */
2670 mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2674 kref_get(&tconn->kref);
2675 mdev->tconn = tconn;
2677 mdev->minor = minor;
2680 drbd_init_set_defaults(mdev);
2682 q = blk_alloc_queue(GFP_KERNEL);
2686 q->queuedata = mdev;
2688 disk = alloc_disk(1);
2693 set_disk_ro(disk, true);
2696 disk->major = DRBD_MAJOR;
2697 disk->first_minor = minor;
2698 disk->fops = &drbd_ops;
2699 sprintf(disk->disk_name, "drbd%d", minor);
2700 disk->private_data = mdev;
2702 mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2703 /* we have no partitions. we contain only ourselves. */
2704 mdev->this_bdev->bd_contains = mdev->this_bdev;
2706 q->backing_dev_info.congested_fn = drbd_congested;
2707 q->backing_dev_info.congested_data = mdev;
2709 blk_queue_make_request(q, drbd_make_request);
2710 /* Setting the max_hw_sectors to an odd value of 8kibyte here
2711 This triggers a max_bio_size message upon first attach or connect */
2712 blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2713 blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2714 blk_queue_merge_bvec(q, drbd_merge_bvec);
2715 q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2717 mdev->md_io_page = alloc_page(GFP_KERNEL);
2718 if (!mdev->md_io_page)
2719 goto out_no_io_page;
2721 if (drbd_bm_init(mdev))
2723 mdev->read_requests = RB_ROOT;
2724 mdev->write_requests = RB_ROOT;
2726 mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2727 if (!mdev->current_epoch)
2730 INIT_LIST_HEAD(&mdev->current_epoch->list);
2733 if (!idr_pre_get(&minors, GFP_KERNEL))
2734 goto out_no_minor_idr;
2735 if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2736 goto out_no_minor_idr;
2737 if (minor_got != minor) {
2738 err = ERR_MINOR_EXISTS;
2739 drbd_msg_put_info("requested minor exists already");
2740 goto out_idr_remove_minor;
2743 if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2744 goto out_idr_remove_minor;
2745 if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2746 goto out_idr_remove_minor;
2747 if (vnr_got != vnr) {
2748 err = ERR_INVALID_REQUEST;
2749 drbd_msg_put_info("requested volume exists already");
2750 goto out_idr_remove_vol;
2753 kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2755 /* inherit the connection state */
2756 mdev->state.conn = tconn->cstate;
2757 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2758 drbd_connected(mdev);
2763 idr_remove(&tconn->volumes, vnr_got);
2764 out_idr_remove_minor:
2765 idr_remove(&minors, minor_got);
2768 kfree(mdev->current_epoch);
2770 drbd_bm_cleanup(mdev);
2772 __free_page(mdev->md_io_page);
2776 blk_cleanup_queue(q);
2779 kref_put(&tconn->kref, &conn_destroy);
2783 int __init drbd_init(void)
2787 if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2789 "drbd: invalid minor_count (%d)\n", minor_count);
2793 minor_count = DRBD_MINOR_COUNT_DEF;
2797 err = register_blkdev(DRBD_MAJOR, "drbd");
2800 "drbd: unable to register block device major %d\n",
2805 err = drbd_genl_register();
2807 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2812 register_reboot_notifier(&drbd_notifier);
2815 * allocate all necessary structs
2819 init_waitqueue_head(&drbd_pp_wait);
2821 drbd_proc = NULL; /* play safe for drbd_cleanup */
2824 err = drbd_create_mempools();
2828 drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2830 printk(KERN_ERR "drbd: unable to register proc file\n");
2834 rwlock_init(&global_state_lock);
2835 INIT_LIST_HEAD(&drbd_tconns);
2837 printk(KERN_INFO "drbd: initialized. "
2838 "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2839 API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2840 printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2841 printk(KERN_INFO "drbd: registered as block device major %d\n",
2844 return 0; /* Success! */
2849 /* currently always the case */
2850 printk(KERN_ERR "drbd: ran out of memory\n");
2852 printk(KERN_ERR "drbd: initialization failure\n");
2856 void drbd_free_bc(struct drbd_backing_dev *ldev)
2861 blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2862 blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2867 void drbd_free_sock(struct drbd_tconn *tconn)
2869 if (tconn->data.socket) {
2870 mutex_lock(&tconn->data.mutex);
2871 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2872 sock_release(tconn->data.socket);
2873 tconn->data.socket = NULL;
2874 mutex_unlock(&tconn->data.mutex);
2876 if (tconn->meta.socket) {
2877 mutex_lock(&tconn->meta.mutex);
2878 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2879 sock_release(tconn->meta.socket);
2880 tconn->meta.socket = NULL;
2881 mutex_unlock(&tconn->meta.mutex);
2885 /* meta data management */
2887 struct meta_data_on_disk {
2888 u64 la_size; /* last agreed size. */
2889 u64 uuid[UI_SIZE]; /* UUIDs. */
2892 u32 flags; /* MDF */
2895 u32 al_offset; /* offset to this block */
2896 u32 al_nr_extents; /* important for restoring the AL */
2897 /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2898 u32 bm_offset; /* offset to the bitmap, from here */
2899 u32 bm_bytes_per_bit; /* BM_BLOCK_SIZE */
2900 u32 la_peer_max_bio_size; /* last peer max_bio_size */
2901 u32 reserved_u32[3];
2906 * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2907 * @mdev: DRBD device.
2909 void drbd_md_sync(struct drbd_conf *mdev)
2911 struct meta_data_on_disk *buffer;
2915 del_timer(&mdev->md_sync_timer);
2916 /* timer may be rearmed by drbd_md_mark_dirty() now. */
2917 if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2920 /* We use here D_FAILED and not D_ATTACHING because we try to write
2921 * metadata even if we detach due to a disk failure! */
2922 if (!get_ldev_if_state(mdev, D_FAILED))
2925 buffer = drbd_md_get_buffer(mdev);
2929 memset(buffer, 0, 512);
2931 buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2932 for (i = UI_CURRENT; i < UI_SIZE; i++)
2933 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2934 buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2935 buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
2937 buffer->md_size_sect = cpu_to_be32(mdev->ldev->md.md_size_sect);
2938 buffer->al_offset = cpu_to_be32(mdev->ldev->md.al_offset);
2939 buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2940 buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2941 buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2943 buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2944 buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2946 D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2947 sector = mdev->ldev->md.md_offset;
2949 if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2950 /* this was a try anyways ... */
2951 dev_err(DEV, "meta data update failed!\n");
2952 drbd_chk_io_error(mdev, 1, true);
2955 /* Update mdev->ldev->md.la_size_sect,
2956 * since we updated it on metadata. */
2957 mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2959 drbd_md_put_buffer(mdev);
2965 * drbd_md_read() - Reads in the meta data super block
2966 * @mdev: DRBD device.
2967 * @bdev: Device from which the meta data should be read in.
2969 * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2970 * something goes wrong.
2972 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2974 struct meta_data_on_disk *buffer;
2976 int i, rv = NO_ERROR;
2978 if (!get_ldev_if_state(mdev, D_ATTACHING))
2979 return ERR_IO_MD_DISK;
2981 buffer = drbd_md_get_buffer(mdev);
2985 if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2986 /* NOTE: can't do normal error processing here as this is
2987 called BEFORE disk is attached */
2988 dev_err(DEV, "Error while reading metadata.\n");
2989 rv = ERR_IO_MD_DISK;
2993 magic = be32_to_cpu(buffer->magic);
2994 flags = be32_to_cpu(buffer->flags);
2995 if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
2996 (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
2997 /* btw: that's Activity Log clean, not "all" clean. */
2998 dev_err(DEV, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
2999 rv = ERR_MD_UNCLEAN;
3002 if (magic != DRBD_MD_MAGIC_08) {
3003 if (magic == DRBD_MD_MAGIC_07)
3004 dev_err(DEV, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
3006 dev_err(DEV, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
3007 rv = ERR_MD_INVALID;
3010 if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3011 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3012 be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3013 rv = ERR_MD_INVALID;
3016 if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3017 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3018 be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3019 rv = ERR_MD_INVALID;
3022 if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3023 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3024 be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3025 rv = ERR_MD_INVALID;
3029 if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3030 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3031 be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3032 rv = ERR_MD_INVALID;
3036 bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3037 for (i = UI_CURRENT; i < UI_SIZE; i++)
3038 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3039 bdev->md.flags = be32_to_cpu(buffer->flags);
3040 bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3042 spin_lock_irq(&mdev->tconn->req_lock);
3043 if (mdev->state.conn < C_CONNECTED) {
3045 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3046 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3047 mdev->peer_max_bio_size = peer;
3049 spin_unlock_irq(&mdev->tconn->req_lock);
3052 drbd_md_put_buffer(mdev);
3060 * drbd_md_mark_dirty() - Mark meta data super block as dirty
3061 * @mdev: DRBD device.
3063 * Call this function if you change anything that should be written to
3064 * the meta-data super block. This function sets MD_DIRTY, and starts a
3065 * timer that ensures that within five seconds you have to call drbd_md_sync().
3068 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3070 if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3071 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3072 mdev->last_md_mark_dirty.line = line;
3073 mdev->last_md_mark_dirty.func = func;
3077 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3079 if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3080 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3084 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3088 for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3089 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3092 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3094 if (idx == UI_CURRENT) {
3095 if (mdev->state.role == R_PRIMARY)
3100 drbd_set_ed_uuid(mdev, val);
3103 mdev->ldev->md.uuid[idx] = val;
3104 drbd_md_mark_dirty(mdev);
3108 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3110 if (mdev->ldev->md.uuid[idx]) {
3111 drbd_uuid_move_history(mdev);
3112 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3114 _drbd_uuid_set(mdev, idx, val);
3118 * drbd_uuid_new_current() - Creates a new current UUID
3119 * @mdev: DRBD device.
3121 * Creates a new current UUID, and rotates the old current UUID into
3122 * the bitmap slot. Causes an incremental resync upon next connect.
3124 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3127 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3130 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3132 mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3134 get_random_bytes(&val, sizeof(u64));
3135 _drbd_uuid_set(mdev, UI_CURRENT, val);
3136 drbd_print_uuids(mdev, "new current UUID");
3137 /* get it to stable storage _now_ */
3141 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3143 if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3147 drbd_uuid_move_history(mdev);
3148 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3149 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3151 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3153 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3155 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3157 drbd_md_mark_dirty(mdev);
3161 * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3162 * @mdev: DRBD device.
3164 * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3166 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3170 if (get_ldev_if_state(mdev, D_ATTACHING)) {
3171 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3173 drbd_bm_set_all(mdev);
3175 rv = drbd_bm_write(mdev);
3178 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3189 * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3190 * @mdev: DRBD device.
3192 * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3194 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3198 drbd_resume_al(mdev);
3199 if (get_ldev_if_state(mdev, D_ATTACHING)) {
3200 drbd_bm_clear_all(mdev);
3201 rv = drbd_bm_write(mdev);
3208 static int w_bitmap_io(struct drbd_work *w, int unused)
3210 struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3211 struct drbd_conf *mdev = w->mdev;
3214 D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3216 if (get_ldev(mdev)) {
3217 drbd_bm_lock(mdev, work->why, work->flags);
3218 rv = work->io_fn(mdev);
3219 drbd_bm_unlock(mdev);
3223 clear_bit_unlock(BITMAP_IO, &mdev->flags);
3224 wake_up(&mdev->misc_wait);
3227 work->done(mdev, rv);
3229 clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3236 void drbd_ldev_destroy(struct drbd_conf *mdev)
3238 lc_destroy(mdev->resync);
3239 mdev->resync = NULL;
3240 lc_destroy(mdev->act_log);
3241 mdev->act_log = NULL;
3243 drbd_free_bc(mdev->ldev);
3244 mdev->ldev = NULL;);
3246 clear_bit(GO_DISKLESS, &mdev->flags);
3249 static int w_go_diskless(struct drbd_work *w, int unused)
3251 struct drbd_conf *mdev = w->mdev;
3253 D_ASSERT(mdev->state.disk == D_FAILED);
3254 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3255 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3256 * the protected members anymore, though, so once put_ldev reaches zero
3257 * again, it will be safe to free them. */
3258 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3262 void drbd_go_diskless(struct drbd_conf *mdev)
3264 D_ASSERT(mdev->state.disk == D_FAILED);
3265 if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3266 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3270 * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3271 * @mdev: DRBD device.
3272 * @io_fn: IO callback to be called when bitmap IO is possible
3273 * @done: callback to be called after the bitmap IO was performed
3274 * @why: Descriptive text of the reason for doing the IO
3276 * While IO on the bitmap happens we freeze application IO thus we ensure
3277 * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3278 * called from worker context. It MUST NOT be used while a previous such
3279 * work is still pending!
3281 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3282 int (*io_fn)(struct drbd_conf *),
3283 void (*done)(struct drbd_conf *, int),
3284 char *why, enum bm_flag flags)
3286 D_ASSERT(current == mdev->tconn->worker.task);
3288 D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3289 D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3290 D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3291 if (mdev->bm_io_work.why)
3292 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3293 why, mdev->bm_io_work.why);
3295 mdev->bm_io_work.io_fn = io_fn;
3296 mdev->bm_io_work.done = done;
3297 mdev->bm_io_work.why = why;
3298 mdev->bm_io_work.flags = flags;
3300 spin_lock_irq(&mdev->tconn->req_lock);
3301 set_bit(BITMAP_IO, &mdev->flags);
3302 if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3303 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3304 drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3306 spin_unlock_irq(&mdev->tconn->req_lock);
3310 * drbd_bitmap_io() - Does an IO operation on the whole bitmap
3311 * @mdev: DRBD device.
3312 * @io_fn: IO callback to be called when bitmap IO is possible
3313 * @why: Descriptive text of the reason for doing the IO
3315 * freezes application IO while that the actual IO operations runs. This
3316 * functions MAY NOT be called from worker context.
3318 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3319 char *why, enum bm_flag flags)
3323 D_ASSERT(current != mdev->tconn->worker.task);
3325 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3326 drbd_suspend_io(mdev);
3328 drbd_bm_lock(mdev, why, flags);
3330 drbd_bm_unlock(mdev);
3332 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3333 drbd_resume_io(mdev);
3338 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3340 if ((mdev->ldev->md.flags & flag) != flag) {
3341 drbd_md_mark_dirty(mdev);
3342 mdev->ldev->md.flags |= flag;
3346 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3348 if ((mdev->ldev->md.flags & flag) != 0) {
3349 drbd_md_mark_dirty(mdev);
3350 mdev->ldev->md.flags &= ~flag;
3353 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3355 return (bdev->md.flags & flag) != 0;
3358 static void md_sync_timer_fn(unsigned long data)
3360 struct drbd_conf *mdev = (struct drbd_conf *) data;
3362 drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3365 static int w_md_sync(struct drbd_work *w, int unused)
3367 struct drbd_conf *mdev = w->mdev;
3369 dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3371 dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3372 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3378 const char *cmdname(enum drbd_packet cmd)
3380 /* THINK may need to become several global tables
3381 * when we want to support more than
3382 * one PRO_VERSION */
3383 static const char *cmdnames[] = {
3385 [P_DATA_REPLY] = "DataReply",
3386 [P_RS_DATA_REPLY] = "RSDataReply",
3387 [P_BARRIER] = "Barrier",
3388 [P_BITMAP] = "ReportBitMap",
3389 [P_BECOME_SYNC_TARGET] = "BecomeSyncTarget",
3390 [P_BECOME_SYNC_SOURCE] = "BecomeSyncSource",
3391 [P_UNPLUG_REMOTE] = "UnplugRemote",
3392 [P_DATA_REQUEST] = "DataRequest",
3393 [P_RS_DATA_REQUEST] = "RSDataRequest",
3394 [P_SYNC_PARAM] = "SyncParam",
3395 [P_SYNC_PARAM89] = "SyncParam89",
3396 [P_PROTOCOL] = "ReportProtocol",
3397 [P_UUIDS] = "ReportUUIDs",
3398 [P_SIZES] = "ReportSizes",
3399 [P_STATE] = "ReportState",
3400 [P_SYNC_UUID] = "ReportSyncUUID",
3401 [P_AUTH_CHALLENGE] = "AuthChallenge",
3402 [P_AUTH_RESPONSE] = "AuthResponse",
3404 [P_PING_ACK] = "PingAck",
3405 [P_RECV_ACK] = "RecvAck",
3406 [P_WRITE_ACK] = "WriteAck",
3407 [P_RS_WRITE_ACK] = "RSWriteAck",
3408 [P_DISCARD_WRITE] = "DiscardWrite",
3409 [P_NEG_ACK] = "NegAck",
3410 [P_NEG_DREPLY] = "NegDReply",
3411 [P_NEG_RS_DREPLY] = "NegRSDReply",
3412 [P_BARRIER_ACK] = "BarrierAck",
3413 [P_STATE_CHG_REQ] = "StateChgRequest",
3414 [P_STATE_CHG_REPLY] = "StateChgReply",
3415 [P_OV_REQUEST] = "OVRequest",
3416 [P_OV_REPLY] = "OVReply",
3417 [P_OV_RESULT] = "OVResult",
3418 [P_CSUM_RS_REQUEST] = "CsumRSRequest",
3419 [P_RS_IS_IN_SYNC] = "CsumRSIsInSync",
3420 [P_COMPRESSED_BITMAP] = "CBitmap",
3421 [P_DELAY_PROBE] = "DelayProbe",
3422 [P_OUT_OF_SYNC] = "OutOfSync",
3423 [P_RETRY_WRITE] = "RetryWrite",
3424 [P_RS_CANCEL] = "RSCancel",
3425 [P_CONN_ST_CHG_REQ] = "conn_st_chg_req",
3426 [P_CONN_ST_CHG_REPLY] = "conn_st_chg_reply",
3427 [P_RETRY_WRITE] = "retry_write",
3428 [P_PROTOCOL_UPDATE] = "protocol_update",
3430 /* enum drbd_packet, but not commands - obsoleted flags:
3436 /* too big for the array: 0xfffX */
3437 if (cmd == P_INITIAL_META)
3438 return "InitialMeta";
3439 if (cmd == P_INITIAL_DATA)
3440 return "InitialData";
3441 if (cmd == P_CONNECTION_FEATURES)
3442 return "ConnectionFeatures";
3443 if (cmd >= ARRAY_SIZE(cmdnames))
3445 return cmdnames[cmd];
3449 * drbd_wait_misc - wait for a request to make progress
3450 * @mdev: device associated with the request
3451 * @i: the struct drbd_interval embedded in struct drbd_request or
3452 * struct drbd_peer_request
3454 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3456 struct net_conf *nc;
3461 nc = rcu_dereference(mdev->tconn->net_conf);
3466 timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3469 /* Indicate to wake up mdev->misc_wait on progress. */
3471 prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3472 spin_unlock_irq(&mdev->tconn->req_lock);
3473 timeout = schedule_timeout(timeout);
3474 finish_wait(&mdev->misc_wait, &wait);
3475 spin_lock_irq(&mdev->tconn->req_lock);
3476 if (!timeout || mdev->state.conn < C_CONNECTED)
3478 if (signal_pending(current))
3479 return -ERESTARTSYS;
3483 #ifdef CONFIG_DRBD_FAULT_INJECTION
3484 /* Fault insertion support including random number generator shamelessly
3485 * stolen from kernel/rcutorture.c */
3486 struct fault_random_state {
3487 unsigned long state;
3488 unsigned long count;
3491 #define FAULT_RANDOM_MULT 39916801 /* prime */
3492 #define FAULT_RANDOM_ADD 479001701 /* prime */
3493 #define FAULT_RANDOM_REFRESH 10000
3496 * Crude but fast random-number generator. Uses a linear congruential
3497 * generator, with occasional help from get_random_bytes().
3499 static unsigned long
3500 _drbd_fault_random(struct fault_random_state *rsp)
3504 if (!rsp->count--) {
3505 get_random_bytes(&refresh, sizeof(refresh));
3506 rsp->state += refresh;
3507 rsp->count = FAULT_RANDOM_REFRESH;
3509 rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3510 return swahw32(rsp->state);
3514 _drbd_fault_str(unsigned int type) {
3515 static char *_faults[] = {
3516 [DRBD_FAULT_MD_WR] = "Meta-data write",
3517 [DRBD_FAULT_MD_RD] = "Meta-data read",
3518 [DRBD_FAULT_RS_WR] = "Resync write",
3519 [DRBD_FAULT_RS_RD] = "Resync read",
3520 [DRBD_FAULT_DT_WR] = "Data write",
3521 [DRBD_FAULT_DT_RD] = "Data read",
3522 [DRBD_FAULT_DT_RA] = "Data read ahead",
3523 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3524 [DRBD_FAULT_AL_EE] = "EE allocation",
3525 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3528 return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3532 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3534 static struct fault_random_state rrs = {0, 0};
3536 unsigned int ret = (
3538 ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3539 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3544 if (__ratelimit(&drbd_ratelimit_state))
3545 dev_warn(DEV, "***Simulating %s failure\n",
3546 _drbd_fault_str(type));
3553 const char *drbd_buildtag(void)
3555 /* DRBD built from external sources has here a reference to the
3556 git hash of the source code. */
3558 static char buildtag[38] = "\0uilt-in";
3560 if (buildtag[0] == 0) {
3561 #ifdef CONFIG_MODULES
3562 if (THIS_MODULE != NULL)
3563 sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3572 module_init(drbd_init)
3573 module_exit(drbd_cleanup)
3575 EXPORT_SYMBOL(drbd_conn_str);
3576 EXPORT_SYMBOL(drbd_role_str);
3577 EXPORT_SYMBOL(drbd_disk_str);
3578 EXPORT_SYMBOL(drbd_set_st_err_str);