drbd: Rename --dry-run to --tentative
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218
219         return 1;
220 }
221
222 static void tl_cleanup(struct drbd_tconn *tconn)
223 {
224         if (tconn->oldest_tle != tconn->newest_tle)
225                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
226         if (!list_empty(&tconn->out_of_sequence_requests))
227                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
228         kfree(tconn->oldest_tle);
229         tconn->oldest_tle = NULL;
230         kfree(tconn->unused_spare_tle);
231         tconn->unused_spare_tle = NULL;
232 }
233
234 /**
235  * _tl_add_barrier() - Adds a barrier to the transfer log
236  * @mdev:       DRBD device.
237  * @new:        Barrier to be added before the current head of the TL.
238  *
239  * The caller must hold the req_lock.
240  */
241 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
242 {
243         struct drbd_tl_epoch *newest_before;
244
245         INIT_LIST_HEAD(&new->requests);
246         INIT_LIST_HEAD(&new->w.list);
247         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
248         new->next = NULL;
249         new->n_writes = 0;
250
251         newest_before = tconn->newest_tle;
252         /* never send a barrier number == 0, because that is special-cased
253          * when using TCQ for our write ordering code */
254         new->br_number = (newest_before->br_number+1) ?: 1;
255         if (tconn->newest_tle != new) {
256                 tconn->newest_tle->next = new;
257                 tconn->newest_tle = new;
258         }
259 }
260
261 /**
262  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
263  * @mdev:       DRBD device.
264  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
265  * @set_size:   Expected number of requests before that barrier.
266  *
267  * In case the passed barrier_nr or set_size does not match the oldest
268  * &struct drbd_tl_epoch objects this function will cause a termination
269  * of the connection.
270  */
271 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
272                 unsigned int set_size)
273 {
274         struct drbd_conf *mdev;
275         struct drbd_tl_epoch *b, *nob; /* next old barrier */
276         struct list_head *le, *tle;
277         struct drbd_request *r;
278
279         spin_lock_irq(&tconn->req_lock);
280
281         b = tconn->oldest_tle;
282
283         /* first some paranoia code */
284         if (b == NULL) {
285                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
286                          barrier_nr);
287                 goto bail;
288         }
289         if (b->br_number != barrier_nr) {
290                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
291                          barrier_nr, b->br_number);
292                 goto bail;
293         }
294         if (b->n_writes != set_size) {
295                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
296                          barrier_nr, set_size, b->n_writes);
297                 goto bail;
298         }
299
300         /* Clean up list of requests processed during current epoch */
301         list_for_each_safe(le, tle, &b->requests) {
302                 r = list_entry(le, struct drbd_request, tl_requests);
303                 _req_mod(r, BARRIER_ACKED);
304         }
305         /* There could be requests on the list waiting for completion
306            of the write to the local disk. To avoid corruptions of
307            slab's data structures we have to remove the lists head.
308
309            Also there could have been a barrier ack out of sequence, overtaking
310            the write acks - which would be a bug and violating write ordering.
311            To not deadlock in case we lose connection while such requests are
312            still pending, we need some way to find them for the
313            _req_mode(CONNECTION_LOST_WHILE_PENDING).
314
315            These have been list_move'd to the out_of_sequence_requests list in
316            _req_mod(, BARRIER_ACKED) above.
317            */
318         list_del_init(&b->requests);
319         mdev = b->w.mdev;
320
321         nob = b->next;
322         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
323                 _tl_add_barrier(tconn, b);
324                 if (nob)
325                         tconn->oldest_tle = nob;
326                 /* if nob == NULL b was the only barrier, and becomes the new
327                    barrier. Therefore tconn->oldest_tle points already to b */
328         } else {
329                 D_ASSERT(nob != NULL);
330                 tconn->oldest_tle = nob;
331                 kfree(b);
332         }
333
334         spin_unlock_irq(&tconn->req_lock);
335         dec_ap_pending(mdev);
336
337         return;
338
339 bail:
340         spin_unlock_irq(&tconn->req_lock);
341         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
342 }
343
344
345 /**
346  * _tl_restart() - Walks the transfer log, and applies an action to all requests
347  * @mdev:       DRBD device.
348  * @what:       The action/event to perform with all request objects
349  *
350  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
351  * RESTART_FROZEN_DISK_IO.
352  */
353 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
354 {
355         struct drbd_tl_epoch *b, *tmp, **pn;
356         struct list_head *le, *tle, carry_reads;
357         struct drbd_request *req;
358         int rv, n_writes, n_reads;
359
360         b = tconn->oldest_tle;
361         pn = &tconn->oldest_tle;
362         while (b) {
363                 n_writes = 0;
364                 n_reads = 0;
365                 INIT_LIST_HEAD(&carry_reads);
366                 list_for_each_safe(le, tle, &b->requests) {
367                         req = list_entry(le, struct drbd_request, tl_requests);
368                         rv = _req_mod(req, what);
369
370                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
371                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
372                 }
373                 tmp = b->next;
374
375                 if (n_writes) {
376                         if (what == RESEND) {
377                                 b->n_writes = n_writes;
378                                 if (b->w.cb == NULL) {
379                                         b->w.cb = w_send_barrier;
380                                         inc_ap_pending(b->w.mdev);
381                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
382                                 }
383
384                                 drbd_queue_work(&tconn->data.work, &b->w);
385                         }
386                         pn = &b->next;
387                 } else {
388                         if (n_reads)
389                                 list_add(&carry_reads, &b->requests);
390                         /* there could still be requests on that ring list,
391                          * in case local io is still pending */
392                         list_del(&b->requests);
393
394                         /* dec_ap_pending corresponding to queue_barrier.
395                          * the newest barrier may not have been queued yet,
396                          * in which case w.cb is still NULL. */
397                         if (b->w.cb != NULL)
398                                 dec_ap_pending(b->w.mdev);
399
400                         if (b == tconn->newest_tle) {
401                                 /* recycle, but reinit! */
402                                 if (tmp != NULL)
403                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
404                                 INIT_LIST_HEAD(&b->requests);
405                                 list_splice(&carry_reads, &b->requests);
406                                 INIT_LIST_HEAD(&b->w.list);
407                                 b->w.cb = NULL;
408                                 b->br_number = net_random();
409                                 b->n_writes = 0;
410
411                                 *pn = b;
412                                 break;
413                         }
414                         *pn = tmp;
415                         kfree(b);
416                 }
417                 b = tmp;
418                 list_splice(&carry_reads, &b->requests);
419         }
420 }
421
422
423 /**
424  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
425  * @mdev:       DRBD device.
426  *
427  * This is called after the connection to the peer was lost. The storage covered
428  * by the requests on the transfer gets marked as our of sync. Called from the
429  * receiver thread and the worker thread.
430  */
431 void tl_clear(struct drbd_tconn *tconn)
432 {
433         struct drbd_conf *mdev;
434         struct list_head *le, *tle;
435         struct drbd_request *r;
436         int vnr;
437
438         spin_lock_irq(&tconn->req_lock);
439
440         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
441
442         /* we expect this list to be empty. */
443         if (!list_empty(&tconn->out_of_sequence_requests))
444                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
445
446         /* but just in case, clean it up anyways! */
447         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
448                 r = list_entry(le, struct drbd_request, tl_requests);
449                 /* It would be nice to complete outside of spinlock.
450                  * But this is easier for now. */
451                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
452         }
453
454         /* ensure bit indicating barrier is required is clear */
455         rcu_read_lock();
456         idr_for_each_entry(&tconn->volumes, mdev, vnr)
457                 clear_bit(CREATE_BARRIER, &mdev->flags);
458         rcu_read_unlock();
459
460         spin_unlock_irq(&tconn->req_lock);
461 }
462
463 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
464 {
465         spin_lock_irq(&tconn->req_lock);
466         _tl_restart(tconn, what);
467         spin_unlock_irq(&tconn->req_lock);
468 }
469
470 static int drbd_thread_setup(void *arg)
471 {
472         struct drbd_thread *thi = (struct drbd_thread *) arg;
473         struct drbd_tconn *tconn = thi->tconn;
474         unsigned long flags;
475         int retval;
476
477         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
478                  thi->name[0], thi->tconn->name);
479
480 restart:
481         retval = thi->function(thi);
482
483         spin_lock_irqsave(&thi->t_lock, flags);
484
485         /* if the receiver has been "EXITING", the last thing it did
486          * was set the conn state to "StandAlone",
487          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
488          * and receiver thread will be "started".
489          * drbd_thread_start needs to set "RESTARTING" in that case.
490          * t_state check and assignment needs to be within the same spinlock,
491          * so either thread_start sees EXITING, and can remap to RESTARTING,
492          * or thread_start see NONE, and can proceed as normal.
493          */
494
495         if (thi->t_state == RESTARTING) {
496                 conn_info(tconn, "Restarting %s thread\n", thi->name);
497                 thi->t_state = RUNNING;
498                 spin_unlock_irqrestore(&thi->t_lock, flags);
499                 goto restart;
500         }
501
502         thi->task = NULL;
503         thi->t_state = NONE;
504         smp_mb();
505         complete_all(&thi->stop);
506         spin_unlock_irqrestore(&thi->t_lock, flags);
507
508         conn_info(tconn, "Terminating %s\n", current->comm);
509
510         /* Release mod reference taken when thread was started */
511
512         kref_put(&tconn->kref, &conn_destroy);
513         module_put(THIS_MODULE);
514         return retval;
515 }
516
517 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
518                              int (*func) (struct drbd_thread *), char *name)
519 {
520         spin_lock_init(&thi->t_lock);
521         thi->task    = NULL;
522         thi->t_state = NONE;
523         thi->function = func;
524         thi->tconn = tconn;
525         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
526 }
527
528 int drbd_thread_start(struct drbd_thread *thi)
529 {
530         struct drbd_tconn *tconn = thi->tconn;
531         struct task_struct *nt;
532         unsigned long flags;
533
534         /* is used from state engine doing drbd_thread_stop_nowait,
535          * while holding the req lock irqsave */
536         spin_lock_irqsave(&thi->t_lock, flags);
537
538         switch (thi->t_state) {
539         case NONE:
540                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
541                          thi->name, current->comm, current->pid);
542
543                 /* Get ref on module for thread - this is released when thread exits */
544                 if (!try_module_get(THIS_MODULE)) {
545                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
546                         spin_unlock_irqrestore(&thi->t_lock, flags);
547                         return false;
548                 }
549
550                 kref_get(&thi->tconn->kref);
551
552                 init_completion(&thi->stop);
553                 thi->reset_cpu_mask = 1;
554                 thi->t_state = RUNNING;
555                 spin_unlock_irqrestore(&thi->t_lock, flags);
556                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
557
558                 nt = kthread_create(drbd_thread_setup, (void *) thi,
559                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
560
561                 if (IS_ERR(nt)) {
562                         conn_err(tconn, "Couldn't start thread\n");
563
564                         kref_put(&tconn->kref, &conn_destroy);
565                         module_put(THIS_MODULE);
566                         return false;
567                 }
568                 spin_lock_irqsave(&thi->t_lock, flags);
569                 thi->task = nt;
570                 thi->t_state = RUNNING;
571                 spin_unlock_irqrestore(&thi->t_lock, flags);
572                 wake_up_process(nt);
573                 break;
574         case EXITING:
575                 thi->t_state = RESTARTING;
576                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
577                                 thi->name, current->comm, current->pid);
578                 /* fall through */
579         case RUNNING:
580         case RESTARTING:
581         default:
582                 spin_unlock_irqrestore(&thi->t_lock, flags);
583                 break;
584         }
585
586         return true;
587 }
588
589
590 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
591 {
592         unsigned long flags;
593
594         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
595
596         /* may be called from state engine, holding the req lock irqsave */
597         spin_lock_irqsave(&thi->t_lock, flags);
598
599         if (thi->t_state == NONE) {
600                 spin_unlock_irqrestore(&thi->t_lock, flags);
601                 if (restart)
602                         drbd_thread_start(thi);
603                 return;
604         }
605
606         if (thi->t_state != ns) {
607                 if (thi->task == NULL) {
608                         spin_unlock_irqrestore(&thi->t_lock, flags);
609                         return;
610                 }
611
612                 thi->t_state = ns;
613                 smp_mb();
614                 init_completion(&thi->stop);
615                 if (thi->task != current)
616                         force_sig(DRBD_SIGKILL, thi->task);
617         }
618
619         spin_unlock_irqrestore(&thi->t_lock, flags);
620
621         if (wait)
622                 wait_for_completion(&thi->stop);
623 }
624
625 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
626 {
627         struct drbd_thread *thi =
628                 task == tconn->receiver.task ? &tconn->receiver :
629                 task == tconn->asender.task  ? &tconn->asender :
630                 task == tconn->worker.task   ? &tconn->worker : NULL;
631
632         return thi;
633 }
634
635 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
636 {
637         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
638         return thi ? thi->name : task->comm;
639 }
640
641 int conn_lowest_minor(struct drbd_tconn *tconn)
642 {
643         struct drbd_conf *mdev;
644         int vnr = 0, m;
645
646         rcu_read_lock();
647         mdev = idr_get_next(&tconn->volumes, &vnr);
648         m = mdev ? mdev_to_minor(mdev) : -1;
649         rcu_read_unlock();
650
651         return m;
652 }
653
654 #ifdef CONFIG_SMP
655 /**
656  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
657  * @mdev:       DRBD device.
658  *
659  * Forces all threads of a device onto the same CPU. This is beneficial for
660  * DRBD's performance. May be overwritten by user's configuration.
661  */
662 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
663 {
664         int ord, cpu;
665
666         /* user override. */
667         if (cpumask_weight(tconn->cpu_mask))
668                 return;
669
670         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
671         for_each_online_cpu(cpu) {
672                 if (ord-- == 0) {
673                         cpumask_set_cpu(cpu, tconn->cpu_mask);
674                         return;
675                 }
676         }
677         /* should not be reached */
678         cpumask_setall(tconn->cpu_mask);
679 }
680
681 /**
682  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
683  * @mdev:       DRBD device.
684  * @thi:        drbd_thread object
685  *
686  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
687  * prematurely.
688  */
689 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
690 {
691         struct task_struct *p = current;
692
693         if (!thi->reset_cpu_mask)
694                 return;
695         thi->reset_cpu_mask = 0;
696         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
697 }
698 #endif
699
700 /**
701  * drbd_header_size  -  size of a packet header
702  *
703  * The header size is a multiple of 8, so any payload following the header is
704  * word aligned on 64-bit architectures.  (The bitmap send and receive code
705  * relies on this.)
706  */
707 unsigned int drbd_header_size(struct drbd_tconn *tconn)
708 {
709         if (tconn->agreed_pro_version >= 100) {
710                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
711                 return sizeof(struct p_header100);
712         } else {
713                 BUILD_BUG_ON(sizeof(struct p_header80) !=
714                              sizeof(struct p_header95));
715                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
716                 return sizeof(struct p_header80);
717         }
718 }
719
720 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
721 {
722         h->magic   = cpu_to_be32(DRBD_MAGIC);
723         h->command = cpu_to_be16(cmd);
724         h->length  = cpu_to_be16(size);
725         return sizeof(struct p_header80);
726 }
727
728 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
729 {
730         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
731         h->command = cpu_to_be16(cmd);
732         h->length = cpu_to_be32(size);
733         return sizeof(struct p_header95);
734 }
735
736 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
737                                       int size, int vnr)
738 {
739         h->magic = cpu_to_be32(DRBD_MAGIC_100);
740         h->volume = cpu_to_be16(vnr);
741         h->command = cpu_to_be16(cmd);
742         h->length = cpu_to_be32(size);
743         h->pad = 0;
744         return sizeof(struct p_header100);
745 }
746
747 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
748                                    void *buffer, enum drbd_packet cmd, int size)
749 {
750         if (tconn->agreed_pro_version >= 100)
751                 return prepare_header100(buffer, cmd, size, vnr);
752         else if (tconn->agreed_pro_version >= 95 &&
753                  size > DRBD_MAX_SIZE_H80_PACKET)
754                 return prepare_header95(buffer, cmd, size);
755         else
756                 return prepare_header80(buffer, cmd, size);
757 }
758
759 static void *__conn_prepare_command(struct drbd_tconn *tconn,
760                                     struct drbd_socket *sock)
761 {
762         if (!sock->socket)
763                 return NULL;
764         return sock->sbuf + drbd_header_size(tconn);
765 }
766
767 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
768 {
769         void *p;
770
771         mutex_lock(&sock->mutex);
772         p = __conn_prepare_command(tconn, sock);
773         if (!p)
774                 mutex_unlock(&sock->mutex);
775
776         return p;
777 }
778
779 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
780 {
781         return conn_prepare_command(mdev->tconn, sock);
782 }
783
784 static int __send_command(struct drbd_tconn *tconn, int vnr,
785                           struct drbd_socket *sock, enum drbd_packet cmd,
786                           unsigned int header_size, void *data,
787                           unsigned int size)
788 {
789         int msg_flags;
790         int err;
791
792         /*
793          * Called with @data == NULL and the size of the data blocks in @size
794          * for commands that send data blocks.  For those commands, omit the
795          * MSG_MORE flag: this will increase the likelihood that data blocks
796          * which are page aligned on the sender will end up page aligned on the
797          * receiver.
798          */
799         msg_flags = data ? MSG_MORE : 0;
800
801         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
802                                       header_size + size);
803         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
804                             msg_flags);
805         if (data && !err)
806                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
807         return err;
808 }
809
810 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
811                                enum drbd_packet cmd, unsigned int header_size,
812                                void *data, unsigned int size)
813 {
814         return __send_command(tconn, 0, sock, cmd, header_size, data, size);
815 }
816
817 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
818                       enum drbd_packet cmd, unsigned int header_size,
819                       void *data, unsigned int size)
820 {
821         int err;
822
823         err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
824         mutex_unlock(&sock->mutex);
825         return err;
826 }
827
828 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
829                       enum drbd_packet cmd, unsigned int header_size,
830                       void *data, unsigned int size)
831 {
832         int err;
833
834         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
835                              data, size);
836         mutex_unlock(&sock->mutex);
837         return err;
838 }
839
840 int drbd_send_ping(struct drbd_tconn *tconn)
841 {
842         struct drbd_socket *sock;
843
844         sock = &tconn->meta;
845         if (!conn_prepare_command(tconn, sock))
846                 return -EIO;
847         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
848 }
849
850 int drbd_send_ping_ack(struct drbd_tconn *tconn)
851 {
852         struct drbd_socket *sock;
853
854         sock = &tconn->meta;
855         if (!conn_prepare_command(tconn, sock))
856                 return -EIO;
857         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
858 }
859
860 int drbd_send_sync_param(struct drbd_conf *mdev)
861 {
862         struct drbd_socket *sock;
863         struct p_rs_param_95 *p;
864         int size;
865         const int apv = mdev->tconn->agreed_pro_version;
866         enum drbd_packet cmd;
867         struct net_conf *nc;
868         struct disk_conf *dc;
869
870         sock = &mdev->tconn->data;
871         p = drbd_prepare_command(mdev, sock);
872         if (!p)
873                 return -EIO;
874
875         rcu_read_lock();
876         nc = rcu_dereference(mdev->tconn->net_conf);
877
878         size = apv <= 87 ? sizeof(struct p_rs_param)
879                 : apv == 88 ? sizeof(struct p_rs_param)
880                         + strlen(nc->verify_alg) + 1
881                 : apv <= 94 ? sizeof(struct p_rs_param_89)
882                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
883
884         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
885
886         /* initialize verify_alg and csums_alg */
887         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
888
889         if (get_ldev(mdev)) {
890                 dc = rcu_dereference(mdev->ldev->disk_conf);
891                 p->resync_rate = cpu_to_be32(dc->resync_rate);
892                 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
893                 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
894                 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
895                 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
896                 put_ldev(mdev);
897         } else {
898                 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
899                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
900                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
901                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
902                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
903         }
904
905         if (apv >= 88)
906                 strcpy(p->verify_alg, nc->verify_alg);
907         if (apv >= 89)
908                 strcpy(p->csums_alg, nc->csums_alg);
909         rcu_read_unlock();
910
911         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
912 }
913
914 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
915 {
916         struct drbd_socket *sock;
917         struct p_protocol *p;
918         struct net_conf *nc;
919         int size, cf;
920
921         sock = &tconn->data;
922         p = __conn_prepare_command(tconn, sock);
923         if (!p)
924                 return -EIO;
925
926         rcu_read_lock();
927         nc = rcu_dereference(tconn->net_conf);
928
929         if (nc->tentative && tconn->agreed_pro_version < 92) {
930                 rcu_read_unlock();
931                 mutex_unlock(&sock->mutex);
932                 conn_err(tconn, "--dry-run is not supported by peer");
933                 return -EOPNOTSUPP;
934         }
935
936         size = sizeof(*p);
937         if (tconn->agreed_pro_version >= 87)
938                 size += strlen(nc->integrity_alg) + 1;
939
940         p->protocol      = cpu_to_be32(nc->wire_protocol);
941         p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
942         p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
943         p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
944         p->two_primaries = cpu_to_be32(nc->two_primaries);
945         cf = 0;
946         if (nc->discard_my_data)
947                 cf |= CF_DISCARD_MY_DATA;
948         if (nc->tentative)
949                 cf |= CF_DRY_RUN;
950         p->conn_flags    = cpu_to_be32(cf);
951
952         if (tconn->agreed_pro_version >= 87)
953                 strcpy(p->integrity_alg, nc->integrity_alg);
954         rcu_read_unlock();
955
956         return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
957 }
958
959 int drbd_send_protocol(struct drbd_tconn *tconn)
960 {
961         int err;
962
963         mutex_lock(&tconn->data.mutex);
964         err = __drbd_send_protocol(tconn, P_PROTOCOL);
965         mutex_unlock(&tconn->data.mutex);
966
967         return err;
968 }
969
970 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
971 {
972         struct drbd_socket *sock;
973         struct p_uuids *p;
974         int i;
975
976         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
977                 return 0;
978
979         sock = &mdev->tconn->data;
980         p = drbd_prepare_command(mdev, sock);
981         if (!p) {
982                 put_ldev(mdev);
983                 return -EIO;
984         }
985         for (i = UI_CURRENT; i < UI_SIZE; i++)
986                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
987
988         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
989         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
990         rcu_read_lock();
991         uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
992         rcu_read_unlock();
993         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
994         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
995         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
996
997         put_ldev(mdev);
998         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
999 }
1000
1001 int drbd_send_uuids(struct drbd_conf *mdev)
1002 {
1003         return _drbd_send_uuids(mdev, 0);
1004 }
1005
1006 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1007 {
1008         return _drbd_send_uuids(mdev, 8);
1009 }
1010
1011 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1012 {
1013         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1014                 u64 *uuid = mdev->ldev->md.uuid;
1015                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1016                      text,
1017                      (unsigned long long)uuid[UI_CURRENT],
1018                      (unsigned long long)uuid[UI_BITMAP],
1019                      (unsigned long long)uuid[UI_HISTORY_START],
1020                      (unsigned long long)uuid[UI_HISTORY_END]);
1021                 put_ldev(mdev);
1022         } else {
1023                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1024                                 text,
1025                                 (unsigned long long)mdev->ed_uuid);
1026         }
1027 }
1028
1029 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1030 {
1031         struct drbd_socket *sock;
1032         struct p_rs_uuid *p;
1033         u64 uuid;
1034
1035         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1036
1037         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1038         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1039         drbd_print_uuids(mdev, "updated sync UUID");
1040         drbd_md_sync(mdev);
1041
1042         sock = &mdev->tconn->data;
1043         p = drbd_prepare_command(mdev, sock);
1044         if (p) {
1045                 p->uuid = cpu_to_be64(uuid);
1046                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1047         }
1048 }
1049
1050 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1051 {
1052         struct drbd_socket *sock;
1053         struct p_sizes *p;
1054         sector_t d_size, u_size;
1055         int q_order_type, max_bio_size;
1056
1057         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1058                 D_ASSERT(mdev->ldev->backing_bdev);
1059                 d_size = drbd_get_max_capacity(mdev->ldev);
1060                 rcu_read_lock();
1061                 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1062                 rcu_read_unlock();
1063                 q_order_type = drbd_queue_order_type(mdev);
1064                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1065                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1066                 put_ldev(mdev);
1067         } else {
1068                 d_size = 0;
1069                 u_size = 0;
1070                 q_order_type = QUEUE_ORDERED_NONE;
1071                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1072         }
1073
1074         sock = &mdev->tconn->data;
1075         p = drbd_prepare_command(mdev, sock);
1076         if (!p)
1077                 return -EIO;
1078         p->d_size = cpu_to_be64(d_size);
1079         p->u_size = cpu_to_be64(u_size);
1080         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1081         p->max_bio_size = cpu_to_be32(max_bio_size);
1082         p->queue_order_type = cpu_to_be16(q_order_type);
1083         p->dds_flags = cpu_to_be16(flags);
1084         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1085 }
1086
1087 /**
1088  * drbd_send_state() - Sends the drbd state to the peer
1089  * @mdev:       DRBD device.
1090  */
1091 int drbd_send_state(struct drbd_conf *mdev)
1092 {
1093         struct drbd_socket *sock;
1094         struct p_state *p;
1095
1096         sock = &mdev->tconn->data;
1097         p = drbd_prepare_command(mdev, sock);
1098         if (!p)
1099                 return -EIO;
1100         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1101         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1102 }
1103
1104 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1105 {
1106         struct drbd_socket *sock;
1107         struct p_req_state *p;
1108
1109         sock = &mdev->tconn->data;
1110         p = drbd_prepare_command(mdev, sock);
1111         if (!p)
1112                 return -EIO;
1113         p->mask = cpu_to_be32(mask.i);
1114         p->val = cpu_to_be32(val.i);
1115         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1116
1117 }
1118
1119 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1120 {
1121         enum drbd_packet cmd;
1122         struct drbd_socket *sock;
1123         struct p_req_state *p;
1124
1125         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1126         sock = &tconn->data;
1127         p = conn_prepare_command(tconn, sock);
1128         if (!p)
1129                 return -EIO;
1130         p->mask = cpu_to_be32(mask.i);
1131         p->val = cpu_to_be32(val.i);
1132         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1133 }
1134
1135 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1136 {
1137         struct drbd_socket *sock;
1138         struct p_req_state_reply *p;
1139
1140         sock = &mdev->tconn->meta;
1141         p = drbd_prepare_command(mdev, sock);
1142         if (p) {
1143                 p->retcode = cpu_to_be32(retcode);
1144                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1145         }
1146 }
1147
1148 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1149 {
1150         struct drbd_socket *sock;
1151         struct p_req_state_reply *p;
1152         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1153
1154         sock = &tconn->meta;
1155         p = conn_prepare_command(tconn, sock);
1156         if (p) {
1157                 p->retcode = cpu_to_be32(retcode);
1158                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1159         }
1160 }
1161
1162 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1163 {
1164         BUG_ON(code & ~0xf);
1165         p->encoding = (p->encoding & ~0xf) | code;
1166 }
1167
1168 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1169 {
1170         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1171 }
1172
1173 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1174 {
1175         BUG_ON(n & ~0x7);
1176         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1177 }
1178
1179 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1180                          struct p_compressed_bm *p,
1181                          unsigned int size,
1182                          struct bm_xfer_ctx *c)
1183 {
1184         struct bitstream bs;
1185         unsigned long plain_bits;
1186         unsigned long tmp;
1187         unsigned long rl;
1188         unsigned len;
1189         unsigned toggle;
1190         int bits, use_rle;
1191
1192         /* may we use this feature? */
1193         rcu_read_lock();
1194         use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1195         rcu_read_unlock();
1196         if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1197                 return 0;
1198
1199         if (c->bit_offset >= c->bm_bits)
1200                 return 0; /* nothing to do. */
1201
1202         /* use at most thus many bytes */
1203         bitstream_init(&bs, p->code, size, 0);
1204         memset(p->code, 0, size);
1205         /* plain bits covered in this code string */
1206         plain_bits = 0;
1207
1208         /* p->encoding & 0x80 stores whether the first run length is set.
1209          * bit offset is implicit.
1210          * start with toggle == 2 to be able to tell the first iteration */
1211         toggle = 2;
1212
1213         /* see how much plain bits we can stuff into one packet
1214          * using RLE and VLI. */
1215         do {
1216                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1217                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1218                 if (tmp == -1UL)
1219                         tmp = c->bm_bits;
1220                 rl = tmp - c->bit_offset;
1221
1222                 if (toggle == 2) { /* first iteration */
1223                         if (rl == 0) {
1224                                 /* the first checked bit was set,
1225                                  * store start value, */
1226                                 dcbp_set_start(p, 1);
1227                                 /* but skip encoding of zero run length */
1228                                 toggle = !toggle;
1229                                 continue;
1230                         }
1231                         dcbp_set_start(p, 0);
1232                 }
1233
1234                 /* paranoia: catch zero runlength.
1235                  * can only happen if bitmap is modified while we scan it. */
1236                 if (rl == 0) {
1237                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1238                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1239                         return -1;
1240                 }
1241
1242                 bits = vli_encode_bits(&bs, rl);
1243                 if (bits == -ENOBUFS) /* buffer full */
1244                         break;
1245                 if (bits <= 0) {
1246                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1247                         return 0;
1248                 }
1249
1250                 toggle = !toggle;
1251                 plain_bits += rl;
1252                 c->bit_offset = tmp;
1253         } while (c->bit_offset < c->bm_bits);
1254
1255         len = bs.cur.b - p->code + !!bs.cur.bit;
1256
1257         if (plain_bits < (len << 3)) {
1258                 /* incompressible with this method.
1259                  * we need to rewind both word and bit position. */
1260                 c->bit_offset -= plain_bits;
1261                 bm_xfer_ctx_bit_to_word_offset(c);
1262                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1263                 return 0;
1264         }
1265
1266         /* RLE + VLI was able to compress it just fine.
1267          * update c->word_offset. */
1268         bm_xfer_ctx_bit_to_word_offset(c);
1269
1270         /* store pad_bits */
1271         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1272
1273         return len;
1274 }
1275
1276 /**
1277  * send_bitmap_rle_or_plain
1278  *
1279  * Return 0 when done, 1 when another iteration is needed, and a negative error
1280  * code upon failure.
1281  */
1282 static int
1283 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1284 {
1285         struct drbd_socket *sock = &mdev->tconn->data;
1286         unsigned int header_size = drbd_header_size(mdev->tconn);
1287         struct p_compressed_bm *p = sock->sbuf + header_size;
1288         int len, err;
1289
1290         len = fill_bitmap_rle_bits(mdev, p,
1291                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1292         if (len < 0)
1293                 return -EIO;
1294
1295         if (len) {
1296                 dcbp_set_code(p, RLE_VLI_Bits);
1297                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1298                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1299                                      NULL, 0);
1300                 c->packets[0]++;
1301                 c->bytes[0] += header_size + sizeof(*p) + len;
1302
1303                 if (c->bit_offset >= c->bm_bits)
1304                         len = 0; /* DONE */
1305         } else {
1306                 /* was not compressible.
1307                  * send a buffer full of plain text bits instead. */
1308                 unsigned int data_size;
1309                 unsigned long num_words;
1310                 unsigned long *p = sock->sbuf + header_size;
1311
1312                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1313                 num_words = min_t(size_t, data_size / sizeof(*p),
1314                                   c->bm_words - c->word_offset);
1315                 len = num_words * sizeof(*p);
1316                 if (len)
1317                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1318                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1319                 c->word_offset += num_words;
1320                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1321
1322                 c->packets[1]++;
1323                 c->bytes[1] += header_size + len;
1324
1325                 if (c->bit_offset > c->bm_bits)
1326                         c->bit_offset = c->bm_bits;
1327         }
1328         if (!err) {
1329                 if (len == 0) {
1330                         INFO_bm_xfer_stats(mdev, "send", c);
1331                         return 0;
1332                 } else
1333                         return 1;
1334         }
1335         return -EIO;
1336 }
1337
1338 /* See the comment at receive_bitmap() */
1339 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1340 {
1341         struct bm_xfer_ctx c;
1342         int err;
1343
1344         if (!expect(mdev->bitmap))
1345                 return false;
1346
1347         if (get_ldev(mdev)) {
1348                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1349                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1350                         drbd_bm_set_all(mdev);
1351                         if (drbd_bm_write(mdev)) {
1352                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1353                                  * but otherwise process as per normal - need to tell other
1354                                  * side that a full resync is required! */
1355                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1356                         } else {
1357                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1358                                 drbd_md_sync(mdev);
1359                         }
1360                 }
1361                 put_ldev(mdev);
1362         }
1363
1364         c = (struct bm_xfer_ctx) {
1365                 .bm_bits = drbd_bm_bits(mdev),
1366                 .bm_words = drbd_bm_words(mdev),
1367         };
1368
1369         do {
1370                 err = send_bitmap_rle_or_plain(mdev, &c);
1371         } while (err > 0);
1372
1373         return err == 0;
1374 }
1375
1376 int drbd_send_bitmap(struct drbd_conf *mdev)
1377 {
1378         struct drbd_socket *sock = &mdev->tconn->data;
1379         int err = -1;
1380
1381         mutex_lock(&sock->mutex);
1382         if (sock->socket)
1383                 err = !_drbd_send_bitmap(mdev);
1384         mutex_unlock(&sock->mutex);
1385         return err;
1386 }
1387
1388 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1389 {
1390         struct drbd_socket *sock;
1391         struct p_barrier_ack *p;
1392
1393         if (mdev->state.conn < C_CONNECTED)
1394                 return;
1395
1396         sock = &mdev->tconn->meta;
1397         p = drbd_prepare_command(mdev, sock);
1398         if (!p)
1399                 return;
1400         p->barrier = barrier_nr;
1401         p->set_size = cpu_to_be32(set_size);
1402         drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1403 }
1404
1405 /**
1406  * _drbd_send_ack() - Sends an ack packet
1407  * @mdev:       DRBD device.
1408  * @cmd:        Packet command code.
1409  * @sector:     sector, needs to be in big endian byte order
1410  * @blksize:    size in byte, needs to be in big endian byte order
1411  * @block_id:   Id, big endian byte order
1412  */
1413 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1414                           u64 sector, u32 blksize, u64 block_id)
1415 {
1416         struct drbd_socket *sock;
1417         struct p_block_ack *p;
1418
1419         if (mdev->state.conn < C_CONNECTED)
1420                 return -EIO;
1421
1422         sock = &mdev->tconn->meta;
1423         p = drbd_prepare_command(mdev, sock);
1424         if (!p)
1425                 return -EIO;
1426         p->sector = sector;
1427         p->block_id = block_id;
1428         p->blksize = blksize;
1429         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1430         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1431 }
1432
1433 /* dp->sector and dp->block_id already/still in network byte order,
1434  * data_size is payload size according to dp->head,
1435  * and may need to be corrected for digest size. */
1436 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1437                       struct p_data *dp, int data_size)
1438 {
1439         if (mdev->tconn->peer_integrity_tfm)
1440                 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1441         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1442                        dp->block_id);
1443 }
1444
1445 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1446                       struct p_block_req *rp)
1447 {
1448         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1449 }
1450
1451 /**
1452  * drbd_send_ack() - Sends an ack packet
1453  * @mdev:       DRBD device
1454  * @cmd:        packet command code
1455  * @peer_req:   peer request
1456  */
1457 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1458                   struct drbd_peer_request *peer_req)
1459 {
1460         return _drbd_send_ack(mdev, cmd,
1461                               cpu_to_be64(peer_req->i.sector),
1462                               cpu_to_be32(peer_req->i.size),
1463                               peer_req->block_id);
1464 }
1465
1466 /* This function misuses the block_id field to signal if the blocks
1467  * are is sync or not. */
1468 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1469                      sector_t sector, int blksize, u64 block_id)
1470 {
1471         return _drbd_send_ack(mdev, cmd,
1472                               cpu_to_be64(sector),
1473                               cpu_to_be32(blksize),
1474                               cpu_to_be64(block_id));
1475 }
1476
1477 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1478                        sector_t sector, int size, u64 block_id)
1479 {
1480         struct drbd_socket *sock;
1481         struct p_block_req *p;
1482
1483         sock = &mdev->tconn->data;
1484         p = drbd_prepare_command(mdev, sock);
1485         if (!p)
1486                 return -EIO;
1487         p->sector = cpu_to_be64(sector);
1488         p->block_id = block_id;
1489         p->blksize = cpu_to_be32(size);
1490         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1491 }
1492
1493 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1494                             void *digest, int digest_size, enum drbd_packet cmd)
1495 {
1496         struct drbd_socket *sock;
1497         struct p_block_req *p;
1498
1499         /* FIXME: Put the digest into the preallocated socket buffer.  */
1500
1501         sock = &mdev->tconn->data;
1502         p = drbd_prepare_command(mdev, sock);
1503         if (!p)
1504                 return -EIO;
1505         p->sector = cpu_to_be64(sector);
1506         p->block_id = ID_SYNCER /* unused */;
1507         p->blksize = cpu_to_be32(size);
1508         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1509                                  digest, digest_size);
1510 }
1511
1512 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1513 {
1514         struct drbd_socket *sock;
1515         struct p_block_req *p;
1516
1517         sock = &mdev->tconn->data;
1518         p = drbd_prepare_command(mdev, sock);
1519         if (!p)
1520                 return -EIO;
1521         p->sector = cpu_to_be64(sector);
1522         p->block_id = ID_SYNCER /* unused */;
1523         p->blksize = cpu_to_be32(size);
1524         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1525 }
1526
1527 /* called on sndtimeo
1528  * returns false if we should retry,
1529  * true if we think connection is dead
1530  */
1531 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1532 {
1533         int drop_it;
1534         /* long elapsed = (long)(jiffies - mdev->last_received); */
1535
1536         drop_it =   tconn->meta.socket == sock
1537                 || !tconn->asender.task
1538                 || get_t_state(&tconn->asender) != RUNNING
1539                 || tconn->cstate < C_WF_REPORT_PARAMS;
1540
1541         if (drop_it)
1542                 return true;
1543
1544         drop_it = !--tconn->ko_count;
1545         if (!drop_it) {
1546                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1547                          current->comm, current->pid, tconn->ko_count);
1548                 request_ping(tconn);
1549         }
1550
1551         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1552 }
1553
1554 static void drbd_update_congested(struct drbd_tconn *tconn)
1555 {
1556         struct sock *sk = tconn->data.socket->sk;
1557         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1558                 set_bit(NET_CONGESTED, &tconn->flags);
1559 }
1560
1561 /* The idea of sendpage seems to be to put some kind of reference
1562  * to the page into the skb, and to hand it over to the NIC. In
1563  * this process get_page() gets called.
1564  *
1565  * As soon as the page was really sent over the network put_page()
1566  * gets called by some part of the network layer. [ NIC driver? ]
1567  *
1568  * [ get_page() / put_page() increment/decrement the count. If count
1569  *   reaches 0 the page will be freed. ]
1570  *
1571  * This works nicely with pages from FSs.
1572  * But this means that in protocol A we might signal IO completion too early!
1573  *
1574  * In order not to corrupt data during a resync we must make sure
1575  * that we do not reuse our own buffer pages (EEs) to early, therefore
1576  * we have the net_ee list.
1577  *
1578  * XFS seems to have problems, still, it submits pages with page_count == 0!
1579  * As a workaround, we disable sendpage on pages
1580  * with page_count == 0 or PageSlab.
1581  */
1582 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1583                               int offset, size_t size, unsigned msg_flags)
1584 {
1585         struct socket *socket;
1586         void *addr;
1587         int err;
1588
1589         socket = mdev->tconn->data.socket;
1590         addr = kmap(page) + offset;
1591         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1592         kunmap(page);
1593         if (!err)
1594                 mdev->send_cnt += size >> 9;
1595         return err;
1596 }
1597
1598 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1599                     int offset, size_t size, unsigned msg_flags)
1600 {
1601         struct socket *socket = mdev->tconn->data.socket;
1602         mm_segment_t oldfs = get_fs();
1603         int len = size;
1604         int err = -EIO;
1605
1606         /* e.g. XFS meta- & log-data is in slab pages, which have a
1607          * page_count of 0 and/or have PageSlab() set.
1608          * we cannot use send_page for those, as that does get_page();
1609          * put_page(); and would cause either a VM_BUG directly, or
1610          * __page_cache_release a page that would actually still be referenced
1611          * by someone, leading to some obscure delayed Oops somewhere else. */
1612         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1613                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1614
1615         msg_flags |= MSG_NOSIGNAL;
1616         drbd_update_congested(mdev->tconn);
1617         set_fs(KERNEL_DS);
1618         do {
1619                 int sent;
1620
1621                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1622                 if (sent <= 0) {
1623                         if (sent == -EAGAIN) {
1624                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1625                                         break;
1626                                 continue;
1627                         }
1628                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1629                              __func__, (int)size, len, sent);
1630                         if (sent < 0)
1631                                 err = sent;
1632                         break;
1633                 }
1634                 len    -= sent;
1635                 offset += sent;
1636         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1637         set_fs(oldfs);
1638         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1639
1640         if (len == 0) {
1641                 err = 0;
1642                 mdev->send_cnt += size >> 9;
1643         }
1644         return err;
1645 }
1646
1647 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1648 {
1649         struct bio_vec *bvec;
1650         int i;
1651         /* hint all but last page with MSG_MORE */
1652         __bio_for_each_segment(bvec, bio, i, 0) {
1653                 int err;
1654
1655                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1656                                          bvec->bv_offset, bvec->bv_len,
1657                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1658                 if (err)
1659                         return err;
1660         }
1661         return 0;
1662 }
1663
1664 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1665 {
1666         struct bio_vec *bvec;
1667         int i;
1668         /* hint all but last page with MSG_MORE */
1669         __bio_for_each_segment(bvec, bio, i, 0) {
1670                 int err;
1671
1672                 err = _drbd_send_page(mdev, bvec->bv_page,
1673                                       bvec->bv_offset, bvec->bv_len,
1674                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1675                 if (err)
1676                         return err;
1677         }
1678         return 0;
1679 }
1680
1681 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1682                             struct drbd_peer_request *peer_req)
1683 {
1684         struct page *page = peer_req->pages;
1685         unsigned len = peer_req->i.size;
1686         int err;
1687
1688         /* hint all but last page with MSG_MORE */
1689         page_chain_for_each(page) {
1690                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1691
1692                 err = _drbd_send_page(mdev, page, 0, l,
1693                                       page_chain_next(page) ? MSG_MORE : 0);
1694                 if (err)
1695                         return err;
1696                 len -= l;
1697         }
1698         return 0;
1699 }
1700
1701 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1702 {
1703         if (mdev->tconn->agreed_pro_version >= 95)
1704                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1705                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1706                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1707                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1708         else
1709                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1710 }
1711
1712 /* Used to send write requests
1713  * R_PRIMARY -> Peer    (P_DATA)
1714  */
1715 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1716 {
1717         struct drbd_socket *sock;
1718         struct p_data *p;
1719         unsigned int dp_flags = 0;
1720         int dgs;
1721         int err;
1722
1723         sock = &mdev->tconn->data;
1724         p = drbd_prepare_command(mdev, sock);
1725         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1726                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1727
1728         if (!p)
1729                 return -EIO;
1730         p->sector = cpu_to_be64(req->i.sector);
1731         p->block_id = (unsigned long)req;
1732         p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1733         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1734         if (mdev->state.conn >= C_SYNC_SOURCE &&
1735             mdev->state.conn <= C_PAUSED_SYNC_T)
1736                 dp_flags |= DP_MAY_SET_IN_SYNC;
1737         if (mdev->tconn->agreed_pro_version >= 100) {
1738                 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1739                         dp_flags |= DP_SEND_RECEIVE_ACK;
1740                 if (req->rq_state & RQ_EXP_WRITE_ACK)
1741                         dp_flags |= DP_SEND_WRITE_ACK;
1742         }
1743         p->dp_flags = cpu_to_be32(dp_flags);
1744         if (dgs)
1745                 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1746         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1747         if (!err) {
1748                 /* For protocol A, we have to memcpy the payload into
1749                  * socket buffers, as we may complete right away
1750                  * as soon as we handed it over to tcp, at which point the data
1751                  * pages may become invalid.
1752                  *
1753                  * For data-integrity enabled, we copy it as well, so we can be
1754                  * sure that even if the bio pages may still be modified, it
1755                  * won't change the data on the wire, thus if the digest checks
1756                  * out ok after sending on this side, but does not fit on the
1757                  * receiving side, we sure have detected corruption elsewhere.
1758                  */
1759                 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1760                         err = _drbd_send_bio(mdev, req->master_bio);
1761                 else
1762                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1763
1764                 /* double check digest, sometimes buffers have been modified in flight. */
1765                 if (dgs > 0 && dgs <= 64) {
1766                         /* 64 byte, 512 bit, is the largest digest size
1767                          * currently supported in kernel crypto. */
1768                         unsigned char digest[64];
1769                         drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1770                         if (memcmp(p + 1, digest, dgs)) {
1771                                 dev_warn(DEV,
1772                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1773                                         (unsigned long long)req->i.sector, req->i.size);
1774                         }
1775                 } /* else if (dgs > 64) {
1776                      ... Be noisy about digest too large ...
1777                 } */
1778         }
1779         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1780
1781         return err;
1782 }
1783
1784 /* answer packet, used to send data back for read requests:
1785  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1786  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1787  */
1788 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1789                     struct drbd_peer_request *peer_req)
1790 {
1791         struct drbd_socket *sock;
1792         struct p_data *p;
1793         int err;
1794         int dgs;
1795
1796         sock = &mdev->tconn->data;
1797         p = drbd_prepare_command(mdev, sock);
1798
1799         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1800                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1801
1802         if (!p)
1803                 return -EIO;
1804         p->sector = cpu_to_be64(peer_req->i.sector);
1805         p->block_id = peer_req->block_id;
1806         p->seq_num = 0;  /* unused */
1807         if (dgs)
1808                 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1809         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1810         if (!err)
1811                 err = _drbd_send_zc_ee(mdev, peer_req);
1812         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1813
1814         return err;
1815 }
1816
1817 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1818 {
1819         struct drbd_socket *sock;
1820         struct p_block_desc *p;
1821
1822         sock = &mdev->tconn->data;
1823         p = drbd_prepare_command(mdev, sock);
1824         if (!p)
1825                 return -EIO;
1826         p->sector = cpu_to_be64(req->i.sector);
1827         p->blksize = cpu_to_be32(req->i.size);
1828         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1829 }
1830
1831 /*
1832   drbd_send distinguishes two cases:
1833
1834   Packets sent via the data socket "sock"
1835   and packets sent via the meta data socket "msock"
1836
1837                     sock                      msock
1838   -----------------+-------------------------+------------------------------
1839   timeout           conf.timeout / 2          conf.timeout / 2
1840   timeout action    send a ping via msock     Abort communication
1841                                               and close all sockets
1842 */
1843
1844 /*
1845  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1846  */
1847 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1848               void *buf, size_t size, unsigned msg_flags)
1849 {
1850         struct kvec iov;
1851         struct msghdr msg;
1852         int rv, sent = 0;
1853
1854         if (!sock)
1855                 return -EBADR;
1856
1857         /* THINK  if (signal_pending) return ... ? */
1858
1859         iov.iov_base = buf;
1860         iov.iov_len  = size;
1861
1862         msg.msg_name       = NULL;
1863         msg.msg_namelen    = 0;
1864         msg.msg_control    = NULL;
1865         msg.msg_controllen = 0;
1866         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1867
1868         if (sock == tconn->data.socket) {
1869                 rcu_read_lock();
1870                 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1871                 rcu_read_unlock();
1872                 drbd_update_congested(tconn);
1873         }
1874         do {
1875                 /* STRANGE
1876                  * tcp_sendmsg does _not_ use its size parameter at all ?
1877                  *
1878                  * -EAGAIN on timeout, -EINTR on signal.
1879                  */
1880 /* THINK
1881  * do we need to block DRBD_SIG if sock == &meta.socket ??
1882  * otherwise wake_asender() might interrupt some send_*Ack !
1883  */
1884                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1885                 if (rv == -EAGAIN) {
1886                         if (we_should_drop_the_connection(tconn, sock))
1887                                 break;
1888                         else
1889                                 continue;
1890                 }
1891                 if (rv == -EINTR) {
1892                         flush_signals(current);
1893                         rv = 0;
1894                 }
1895                 if (rv < 0)
1896                         break;
1897                 sent += rv;
1898                 iov.iov_base += rv;
1899                 iov.iov_len  -= rv;
1900         } while (sent < size);
1901
1902         if (sock == tconn->data.socket)
1903                 clear_bit(NET_CONGESTED, &tconn->flags);
1904
1905         if (rv <= 0) {
1906                 if (rv != -EAGAIN) {
1907                         conn_err(tconn, "%s_sendmsg returned %d\n",
1908                                  sock == tconn->meta.socket ? "msock" : "sock",
1909                                  rv);
1910                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1911                 } else
1912                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1913         }
1914
1915         return sent;
1916 }
1917
1918 /**
1919  * drbd_send_all  -  Send an entire buffer
1920  *
1921  * Returns 0 upon success and a negative error value otherwise.
1922  */
1923 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1924                   size_t size, unsigned msg_flags)
1925 {
1926         int err;
1927
1928         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1929         if (err < 0)
1930                 return err;
1931         if (err != size)
1932                 return -EIO;
1933         return 0;
1934 }
1935
1936 static int drbd_open(struct block_device *bdev, fmode_t mode)
1937 {
1938         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1939         unsigned long flags;
1940         int rv = 0;
1941
1942         mutex_lock(&drbd_main_mutex);
1943         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1944         /* to have a stable mdev->state.role
1945          * and no race with updating open_cnt */
1946
1947         if (mdev->state.role != R_PRIMARY) {
1948                 if (mode & FMODE_WRITE)
1949                         rv = -EROFS;
1950                 else if (!allow_oos)
1951                         rv = -EMEDIUMTYPE;
1952         }
1953
1954         if (!rv)
1955                 mdev->open_cnt++;
1956         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1957         mutex_unlock(&drbd_main_mutex);
1958
1959         return rv;
1960 }
1961
1962 static int drbd_release(struct gendisk *gd, fmode_t mode)
1963 {
1964         struct drbd_conf *mdev = gd->private_data;
1965         mutex_lock(&drbd_main_mutex);
1966         mdev->open_cnt--;
1967         mutex_unlock(&drbd_main_mutex);
1968         return 0;
1969 }
1970
1971 static void drbd_set_defaults(struct drbd_conf *mdev)
1972 {
1973         /* Beware! The actual layout differs
1974          * between big endian and little endian */
1975         mdev->state = (union drbd_dev_state) {
1976                 { .role = R_SECONDARY,
1977                   .peer = R_UNKNOWN,
1978                   .conn = C_STANDALONE,
1979                   .disk = D_DISKLESS,
1980                   .pdsk = D_UNKNOWN,
1981                 } };
1982 }
1983
1984 void drbd_init_set_defaults(struct drbd_conf *mdev)
1985 {
1986         /* the memset(,0,) did most of this.
1987          * note: only assignments, no allocation in here */
1988
1989         drbd_set_defaults(mdev);
1990
1991         atomic_set(&mdev->ap_bio_cnt, 0);
1992         atomic_set(&mdev->ap_pending_cnt, 0);
1993         atomic_set(&mdev->rs_pending_cnt, 0);
1994         atomic_set(&mdev->unacked_cnt, 0);
1995         atomic_set(&mdev->local_cnt, 0);
1996         atomic_set(&mdev->pp_in_use_by_net, 0);
1997         atomic_set(&mdev->rs_sect_in, 0);
1998         atomic_set(&mdev->rs_sect_ev, 0);
1999         atomic_set(&mdev->ap_in_flight, 0);
2000
2001         mutex_init(&mdev->md_io_mutex);
2002         mutex_init(&mdev->own_state_mutex);
2003         mdev->state_mutex = &mdev->own_state_mutex;
2004
2005         spin_lock_init(&mdev->al_lock);
2006         spin_lock_init(&mdev->peer_seq_lock);
2007         spin_lock_init(&mdev->epoch_lock);
2008
2009         INIT_LIST_HEAD(&mdev->active_ee);
2010         INIT_LIST_HEAD(&mdev->sync_ee);
2011         INIT_LIST_HEAD(&mdev->done_ee);
2012         INIT_LIST_HEAD(&mdev->read_ee);
2013         INIT_LIST_HEAD(&mdev->net_ee);
2014         INIT_LIST_HEAD(&mdev->resync_reads);
2015         INIT_LIST_HEAD(&mdev->resync_work.list);
2016         INIT_LIST_HEAD(&mdev->unplug_work.list);
2017         INIT_LIST_HEAD(&mdev->go_diskless.list);
2018         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2019         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2020         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2021
2022         mdev->resync_work.cb  = w_resync_timer;
2023         mdev->unplug_work.cb  = w_send_write_hint;
2024         mdev->go_diskless.cb  = w_go_diskless;
2025         mdev->md_sync_work.cb = w_md_sync;
2026         mdev->bm_io_work.w.cb = w_bitmap_io;
2027         mdev->start_resync_work.cb = w_start_resync;
2028
2029         mdev->resync_work.mdev  = mdev;
2030         mdev->unplug_work.mdev  = mdev;
2031         mdev->go_diskless.mdev  = mdev;
2032         mdev->md_sync_work.mdev = mdev;
2033         mdev->bm_io_work.w.mdev = mdev;
2034         mdev->start_resync_work.mdev = mdev;
2035
2036         init_timer(&mdev->resync_timer);
2037         init_timer(&mdev->md_sync_timer);
2038         init_timer(&mdev->start_resync_timer);
2039         init_timer(&mdev->request_timer);
2040         mdev->resync_timer.function = resync_timer_fn;
2041         mdev->resync_timer.data = (unsigned long) mdev;
2042         mdev->md_sync_timer.function = md_sync_timer_fn;
2043         mdev->md_sync_timer.data = (unsigned long) mdev;
2044         mdev->start_resync_timer.function = start_resync_timer_fn;
2045         mdev->start_resync_timer.data = (unsigned long) mdev;
2046         mdev->request_timer.function = request_timer_fn;
2047         mdev->request_timer.data = (unsigned long) mdev;
2048
2049         init_waitqueue_head(&mdev->misc_wait);
2050         init_waitqueue_head(&mdev->state_wait);
2051         init_waitqueue_head(&mdev->ee_wait);
2052         init_waitqueue_head(&mdev->al_wait);
2053         init_waitqueue_head(&mdev->seq_wait);
2054
2055         mdev->write_ordering = WO_bdev_flush;
2056         mdev->resync_wenr = LC_FREE;
2057         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2058         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2059 }
2060
2061 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2062 {
2063         int i;
2064         if (mdev->tconn->receiver.t_state != NONE)
2065                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2066                                 mdev->tconn->receiver.t_state);
2067
2068         /* no need to lock it, I'm the only thread alive */
2069         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2070                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2071         mdev->al_writ_cnt  =
2072         mdev->bm_writ_cnt  =
2073         mdev->read_cnt     =
2074         mdev->recv_cnt     =
2075         mdev->send_cnt     =
2076         mdev->writ_cnt     =
2077         mdev->p_size       =
2078         mdev->rs_start     =
2079         mdev->rs_total     =
2080         mdev->rs_failed    = 0;
2081         mdev->rs_last_events = 0;
2082         mdev->rs_last_sect_ev = 0;
2083         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2084                 mdev->rs_mark_left[i] = 0;
2085                 mdev->rs_mark_time[i] = 0;
2086         }
2087         D_ASSERT(mdev->tconn->net_conf == NULL);
2088
2089         drbd_set_my_capacity(mdev, 0);
2090         if (mdev->bitmap) {
2091                 /* maybe never allocated. */
2092                 drbd_bm_resize(mdev, 0, 1);
2093                 drbd_bm_cleanup(mdev);
2094         }
2095
2096         drbd_free_bc(mdev->ldev);
2097         mdev->ldev = NULL;
2098
2099         clear_bit(AL_SUSPENDED, &mdev->flags);
2100
2101         D_ASSERT(list_empty(&mdev->active_ee));
2102         D_ASSERT(list_empty(&mdev->sync_ee));
2103         D_ASSERT(list_empty(&mdev->done_ee));
2104         D_ASSERT(list_empty(&mdev->read_ee));
2105         D_ASSERT(list_empty(&mdev->net_ee));
2106         D_ASSERT(list_empty(&mdev->resync_reads));
2107         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2108         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2109         D_ASSERT(list_empty(&mdev->resync_work.list));
2110         D_ASSERT(list_empty(&mdev->unplug_work.list));
2111         D_ASSERT(list_empty(&mdev->go_diskless.list));
2112
2113         drbd_set_defaults(mdev);
2114 }
2115
2116
2117 static void drbd_destroy_mempools(void)
2118 {
2119         struct page *page;
2120
2121         while (drbd_pp_pool) {
2122                 page = drbd_pp_pool;
2123                 drbd_pp_pool = (struct page *)page_private(page);
2124                 __free_page(page);
2125                 drbd_pp_vacant--;
2126         }
2127
2128         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2129
2130         if (drbd_md_io_bio_set)
2131                 bioset_free(drbd_md_io_bio_set);
2132         if (drbd_md_io_page_pool)
2133                 mempool_destroy(drbd_md_io_page_pool);
2134         if (drbd_ee_mempool)
2135                 mempool_destroy(drbd_ee_mempool);
2136         if (drbd_request_mempool)
2137                 mempool_destroy(drbd_request_mempool);
2138         if (drbd_ee_cache)
2139                 kmem_cache_destroy(drbd_ee_cache);
2140         if (drbd_request_cache)
2141                 kmem_cache_destroy(drbd_request_cache);
2142         if (drbd_bm_ext_cache)
2143                 kmem_cache_destroy(drbd_bm_ext_cache);
2144         if (drbd_al_ext_cache)
2145                 kmem_cache_destroy(drbd_al_ext_cache);
2146
2147         drbd_md_io_bio_set   = NULL;
2148         drbd_md_io_page_pool = NULL;
2149         drbd_ee_mempool      = NULL;
2150         drbd_request_mempool = NULL;
2151         drbd_ee_cache        = NULL;
2152         drbd_request_cache   = NULL;
2153         drbd_bm_ext_cache    = NULL;
2154         drbd_al_ext_cache    = NULL;
2155
2156         return;
2157 }
2158
2159 static int drbd_create_mempools(void)
2160 {
2161         struct page *page;
2162         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2163         int i;
2164
2165         /* prepare our caches and mempools */
2166         drbd_request_mempool = NULL;
2167         drbd_ee_cache        = NULL;
2168         drbd_request_cache   = NULL;
2169         drbd_bm_ext_cache    = NULL;
2170         drbd_al_ext_cache    = NULL;
2171         drbd_pp_pool         = NULL;
2172         drbd_md_io_page_pool = NULL;
2173         drbd_md_io_bio_set   = NULL;
2174
2175         /* caches */
2176         drbd_request_cache = kmem_cache_create(
2177                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2178         if (drbd_request_cache == NULL)
2179                 goto Enomem;
2180
2181         drbd_ee_cache = kmem_cache_create(
2182                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2183         if (drbd_ee_cache == NULL)
2184                 goto Enomem;
2185
2186         drbd_bm_ext_cache = kmem_cache_create(
2187                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2188         if (drbd_bm_ext_cache == NULL)
2189                 goto Enomem;
2190
2191         drbd_al_ext_cache = kmem_cache_create(
2192                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2193         if (drbd_al_ext_cache == NULL)
2194                 goto Enomem;
2195
2196         /* mempools */
2197         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2198         if (drbd_md_io_bio_set == NULL)
2199                 goto Enomem;
2200
2201         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2202         if (drbd_md_io_page_pool == NULL)
2203                 goto Enomem;
2204
2205         drbd_request_mempool = mempool_create(number,
2206                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2207         if (drbd_request_mempool == NULL)
2208                 goto Enomem;
2209
2210         drbd_ee_mempool = mempool_create(number,
2211                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2212         if (drbd_ee_mempool == NULL)
2213                 goto Enomem;
2214
2215         /* drbd's page pool */
2216         spin_lock_init(&drbd_pp_lock);
2217
2218         for (i = 0; i < number; i++) {
2219                 page = alloc_page(GFP_HIGHUSER);
2220                 if (!page)
2221                         goto Enomem;
2222                 set_page_private(page, (unsigned long)drbd_pp_pool);
2223                 drbd_pp_pool = page;
2224         }
2225         drbd_pp_vacant = number;
2226
2227         return 0;
2228
2229 Enomem:
2230         drbd_destroy_mempools(); /* in case we allocated some */
2231         return -ENOMEM;
2232 }
2233
2234 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2235         void *unused)
2236 {
2237         /* just so we have it.  you never know what interesting things we
2238          * might want to do here some day...
2239          */
2240
2241         return NOTIFY_DONE;
2242 }
2243
2244 static struct notifier_block drbd_notifier = {
2245         .notifier_call = drbd_notify_sys,
2246 };
2247
2248 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2249 {
2250         int rr;
2251
2252         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2253         if (rr)
2254                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2255
2256         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2257         if (rr)
2258                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2259
2260         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2261         if (rr)
2262                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2263
2264         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2265         if (rr)
2266                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2267
2268         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2269         if (rr)
2270                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2271 }
2272
2273 /* caution. no locking. */
2274 void drbd_minor_destroy(struct kref *kref)
2275 {
2276         struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2277         struct drbd_tconn *tconn = mdev->tconn;
2278
2279         /* paranoia asserts */
2280         D_ASSERT(mdev->open_cnt == 0);
2281         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2282         /* end paranoia asserts */
2283
2284         /* cleanup stuff that may have been allocated during
2285          * device (re-)configuration or state changes */
2286
2287         if (mdev->this_bdev)
2288                 bdput(mdev->this_bdev);
2289
2290         drbd_free_bc(mdev->ldev);
2291         mdev->ldev = NULL;
2292
2293         drbd_release_all_peer_reqs(mdev);
2294
2295         lc_destroy(mdev->act_log);
2296         lc_destroy(mdev->resync);
2297
2298         kfree(mdev->p_uuid);
2299         /* mdev->p_uuid = NULL; */
2300
2301         kfree(mdev->current_epoch);
2302         if (mdev->bitmap) /* should no longer be there. */
2303                 drbd_bm_cleanup(mdev);
2304         __free_page(mdev->md_io_page);
2305         put_disk(mdev->vdisk);
2306         blk_cleanup_queue(mdev->rq_queue);
2307         kfree(mdev->rs_plan_s);
2308         kfree(mdev);
2309
2310         kref_put(&tconn->kref, &conn_destroy);
2311 }
2312
2313 static void drbd_cleanup(void)
2314 {
2315         unsigned int i;
2316         struct drbd_conf *mdev;
2317         struct drbd_tconn *tconn, *tmp;
2318
2319         unregister_reboot_notifier(&drbd_notifier);
2320
2321         /* first remove proc,
2322          * drbdsetup uses it's presence to detect
2323          * whether DRBD is loaded.
2324          * If we would get stuck in proc removal,
2325          * but have netlink already deregistered,
2326          * some drbdsetup commands may wait forever
2327          * for an answer.
2328          */
2329         if (drbd_proc)
2330                 remove_proc_entry("drbd", NULL);
2331
2332         drbd_genl_unregister();
2333
2334         idr_for_each_entry(&minors, mdev, i) {
2335                 idr_remove(&minors, mdev_to_minor(mdev));
2336                 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2337                 del_gendisk(mdev->vdisk);
2338                 /* synchronize_rcu(); No other threads running at this point */
2339                 kref_put(&mdev->kref, &drbd_minor_destroy);
2340         }
2341
2342         /* not _rcu since, no other updater anymore. Genl already unregistered */
2343         list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2344                 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2345                 /* synchronize_rcu(); */
2346                 kref_put(&tconn->kref, &conn_destroy);
2347         }
2348
2349         drbd_destroy_mempools();
2350         unregister_blkdev(DRBD_MAJOR, "drbd");
2351
2352         idr_destroy(&minors);
2353
2354         printk(KERN_INFO "drbd: module cleanup done.\n");
2355 }
2356
2357 /**
2358  * drbd_congested() - Callback for pdflush
2359  * @congested_data:     User data
2360  * @bdi_bits:           Bits pdflush is currently interested in
2361  *
2362  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2363  */
2364 static int drbd_congested(void *congested_data, int bdi_bits)
2365 {
2366         struct drbd_conf *mdev = congested_data;
2367         struct request_queue *q;
2368         char reason = '-';
2369         int r = 0;
2370
2371         if (!may_inc_ap_bio(mdev)) {
2372                 /* DRBD has frozen IO */
2373                 r = bdi_bits;
2374                 reason = 'd';
2375                 goto out;
2376         }
2377
2378         if (get_ldev(mdev)) {
2379                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2380                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2381                 put_ldev(mdev);
2382                 if (r)
2383                         reason = 'b';
2384         }
2385
2386         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2387                 r |= (1 << BDI_async_congested);
2388                 reason = reason == 'b' ? 'a' : 'n';
2389         }
2390
2391 out:
2392         mdev->congestion_reason = reason;
2393         return r;
2394 }
2395
2396 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2397 {
2398         sema_init(&wq->s, 0);
2399         spin_lock_init(&wq->q_lock);
2400         INIT_LIST_HEAD(&wq->q);
2401 }
2402
2403 struct drbd_tconn *conn_get_by_name(const char *name)
2404 {
2405         struct drbd_tconn *tconn;
2406
2407         if (!name || !name[0])
2408                 return NULL;
2409
2410         rcu_read_lock();
2411         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2412                 if (!strcmp(tconn->name, name)) {
2413                         kref_get(&tconn->kref);
2414                         goto found;
2415                 }
2416         }
2417         tconn = NULL;
2418 found:
2419         rcu_read_unlock();
2420         return tconn;
2421 }
2422
2423 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2424                                      void *peer_addr, int peer_addr_len)
2425 {
2426         struct drbd_tconn *tconn;
2427
2428         rcu_read_lock();
2429         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2430                 if (tconn->my_addr_len == my_addr_len &&
2431                     tconn->peer_addr_len == peer_addr_len &&
2432                     !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2433                     !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2434                         kref_get(&tconn->kref);
2435                         goto found;
2436                 }
2437         }
2438         tconn = NULL;
2439 found:
2440         rcu_read_unlock();
2441         return tconn;
2442 }
2443
2444 static int drbd_alloc_socket(struct drbd_socket *socket)
2445 {
2446         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2447         if (!socket->rbuf)
2448                 return -ENOMEM;
2449         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2450         if (!socket->sbuf)
2451                 return -ENOMEM;
2452         return 0;
2453 }
2454
2455 static void drbd_free_socket(struct drbd_socket *socket)
2456 {
2457         free_page((unsigned long) socket->sbuf);
2458         free_page((unsigned long) socket->rbuf);
2459 }
2460
2461 void conn_free_crypto(struct drbd_tconn *tconn)
2462 {
2463         drbd_free_sock(tconn);
2464
2465         crypto_free_hash(tconn->csums_tfm);
2466         crypto_free_hash(tconn->verify_tfm);
2467         crypto_free_hash(tconn->cram_hmac_tfm);
2468         crypto_free_hash(tconn->integrity_tfm);
2469         crypto_free_hash(tconn->peer_integrity_tfm);
2470         kfree(tconn->int_dig_in);
2471         kfree(tconn->int_dig_vv);
2472
2473         tconn->csums_tfm = NULL;
2474         tconn->verify_tfm = NULL;
2475         tconn->cram_hmac_tfm = NULL;
2476         tconn->integrity_tfm = NULL;
2477         tconn->peer_integrity_tfm = NULL;
2478         tconn->int_dig_in = NULL;
2479         tconn->int_dig_vv = NULL;
2480 }
2481
2482 int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
2483 {
2484         cpumask_var_t new_cpu_mask;
2485         int err;
2486
2487         if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2488                 return -ENOMEM;
2489                 /*
2490                 retcode = ERR_NOMEM;
2491                 drbd_msg_put_info("unable to allocate cpumask");
2492                 */
2493
2494         /* silently ignore cpu mask on UP kernel */
2495         if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2496                 /* FIXME: Get rid of constant 32 here */
2497                 err = __bitmap_parse(res_opts->cpu_mask, 32, 0,
2498                                 cpumask_bits(new_cpu_mask), nr_cpu_ids);
2499                 if (err) {
2500                         conn_warn(tconn, "__bitmap_parse() failed with %d\n", err);
2501                         /* retcode = ERR_CPU_MASK_PARSE; */
2502                         goto fail;
2503                 }
2504         }
2505         tconn->res_opts = *res_opts;
2506         if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
2507                 cpumask_copy(tconn->cpu_mask, new_cpu_mask);
2508                 drbd_calc_cpu_mask(tconn);
2509                 tconn->receiver.reset_cpu_mask = 1;
2510                 tconn->asender.reset_cpu_mask = 1;
2511                 tconn->worker.reset_cpu_mask = 1;
2512         }
2513         err = 0;
2514
2515 fail:
2516         free_cpumask_var(new_cpu_mask);
2517         return err;
2518
2519 }
2520
2521 /* caller must be under genl_lock() */
2522 struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2523 {
2524         struct drbd_tconn *tconn;
2525
2526         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2527         if (!tconn)
2528                 return NULL;
2529
2530         tconn->name = kstrdup(name, GFP_KERNEL);
2531         if (!tconn->name)
2532                 goto fail;
2533
2534         if (drbd_alloc_socket(&tconn->data))
2535                 goto fail;
2536         if (drbd_alloc_socket(&tconn->meta))
2537                 goto fail;
2538
2539         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2540                 goto fail;
2541
2542         if (set_resource_options(tconn, res_opts))
2543                 goto fail;
2544
2545         if (!tl_init(tconn))
2546                 goto fail;
2547
2548         tconn->cstate = C_STANDALONE;
2549         mutex_init(&tconn->cstate_mutex);
2550         spin_lock_init(&tconn->req_lock);
2551         mutex_init(&tconn->conf_update);
2552         init_waitqueue_head(&tconn->ping_wait);
2553         idr_init(&tconn->volumes);
2554
2555         drbd_init_workqueue(&tconn->data.work);
2556         mutex_init(&tconn->data.mutex);
2557
2558         drbd_init_workqueue(&tconn->meta.work);
2559         mutex_init(&tconn->meta.mutex);
2560
2561         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2562         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2563         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2564
2565         kref_init(&tconn->kref);
2566         list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2567
2568         return tconn;
2569
2570 fail:
2571         tl_cleanup(tconn);
2572         free_cpumask_var(tconn->cpu_mask);
2573         drbd_free_socket(&tconn->meta);
2574         drbd_free_socket(&tconn->data);
2575         kfree(tconn->name);
2576         kfree(tconn);
2577
2578         return NULL;
2579 }
2580
2581 void conn_destroy(struct kref *kref)
2582 {
2583         struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2584
2585         idr_destroy(&tconn->volumes);
2586
2587         free_cpumask_var(tconn->cpu_mask);
2588         drbd_free_socket(&tconn->meta);
2589         drbd_free_socket(&tconn->data);
2590         kfree(tconn->name);
2591         kfree(tconn->int_dig_in);
2592         kfree(tconn->int_dig_vv);
2593         kfree(tconn);
2594 }
2595
2596 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2597 {
2598         struct drbd_conf *mdev;
2599         struct gendisk *disk;
2600         struct request_queue *q;
2601         int vnr_got = vnr;
2602         int minor_got = minor;
2603         enum drbd_ret_code err = ERR_NOMEM;
2604
2605         mdev = minor_to_mdev(minor);
2606         if (mdev)
2607                 return ERR_MINOR_EXISTS;
2608
2609         /* GFP_KERNEL, we are outside of all write-out paths */
2610         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2611         if (!mdev)
2612                 return ERR_NOMEM;
2613
2614         kref_get(&tconn->kref);
2615         mdev->tconn = tconn;
2616
2617         mdev->minor = minor;
2618         mdev->vnr = vnr;
2619
2620         drbd_init_set_defaults(mdev);
2621
2622         q = blk_alloc_queue(GFP_KERNEL);
2623         if (!q)
2624                 goto out_no_q;
2625         mdev->rq_queue = q;
2626         q->queuedata   = mdev;
2627
2628         disk = alloc_disk(1);
2629         if (!disk)
2630                 goto out_no_disk;
2631         mdev->vdisk = disk;
2632
2633         set_disk_ro(disk, true);
2634
2635         disk->queue = q;
2636         disk->major = DRBD_MAJOR;
2637         disk->first_minor = minor;
2638         disk->fops = &drbd_ops;
2639         sprintf(disk->disk_name, "drbd%d", minor);
2640         disk->private_data = mdev;
2641
2642         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2643         /* we have no partitions. we contain only ourselves. */
2644         mdev->this_bdev->bd_contains = mdev->this_bdev;
2645
2646         q->backing_dev_info.congested_fn = drbd_congested;
2647         q->backing_dev_info.congested_data = mdev;
2648
2649         blk_queue_make_request(q, drbd_make_request);
2650         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2651            This triggers a max_bio_size message upon first attach or connect */
2652         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2653         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2654         blk_queue_merge_bvec(q, drbd_merge_bvec);
2655         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2656
2657         mdev->md_io_page = alloc_page(GFP_KERNEL);
2658         if (!mdev->md_io_page)
2659                 goto out_no_io_page;
2660
2661         if (drbd_bm_init(mdev))
2662                 goto out_no_bitmap;
2663         mdev->read_requests = RB_ROOT;
2664         mdev->write_requests = RB_ROOT;
2665
2666         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2667         if (!mdev->current_epoch)
2668                 goto out_no_epoch;
2669
2670         INIT_LIST_HEAD(&mdev->current_epoch->list);
2671         mdev->epochs = 1;
2672
2673         if (!idr_pre_get(&minors, GFP_KERNEL))
2674                 goto out_no_minor_idr;
2675         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2676                 goto out_no_minor_idr;
2677         if (minor_got != minor) {
2678                 err = ERR_MINOR_EXISTS;
2679                 drbd_msg_put_info("requested minor exists already");
2680                 goto out_idr_remove_minor;
2681         }
2682
2683         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2684                 goto out_idr_remove_minor;
2685         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2686                 goto out_idr_remove_minor;
2687         if (vnr_got != vnr) {
2688                 err = ERR_INVALID_REQUEST;
2689                 drbd_msg_put_info("requested volume exists already");
2690                 goto out_idr_remove_vol;
2691         }
2692         add_disk(disk);
2693         kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2694
2695         /* inherit the connection state */
2696         mdev->state.conn = tconn->cstate;
2697         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2698                 drbd_connected(mdev);
2699
2700         return NO_ERROR;
2701
2702 out_idr_remove_vol:
2703         idr_remove(&tconn->volumes, vnr_got);
2704 out_idr_remove_minor:
2705         idr_remove(&minors, minor_got);
2706         synchronize_rcu();
2707 out_no_minor_idr:
2708         kfree(mdev->current_epoch);
2709 out_no_epoch:
2710         drbd_bm_cleanup(mdev);
2711 out_no_bitmap:
2712         __free_page(mdev->md_io_page);
2713 out_no_io_page:
2714         put_disk(disk);
2715 out_no_disk:
2716         blk_cleanup_queue(q);
2717 out_no_q:
2718         kfree(mdev);
2719         kref_put(&tconn->kref, &conn_destroy);
2720         return err;
2721 }
2722
2723 int __init drbd_init(void)
2724 {
2725         int err;
2726
2727         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2728                 printk(KERN_ERR
2729                        "drbd: invalid minor_count (%d)\n", minor_count);
2730 #ifdef MODULE
2731                 return -EINVAL;
2732 #else
2733                 minor_count = DRBD_MINOR_COUNT_DEF;
2734 #endif
2735         }
2736
2737         err = register_blkdev(DRBD_MAJOR, "drbd");
2738         if (err) {
2739                 printk(KERN_ERR
2740                        "drbd: unable to register block device major %d\n",
2741                        DRBD_MAJOR);
2742                 return err;
2743         }
2744
2745         err = drbd_genl_register();
2746         if (err) {
2747                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2748                 goto fail;
2749         }
2750
2751
2752         register_reboot_notifier(&drbd_notifier);
2753
2754         /*
2755          * allocate all necessary structs
2756          */
2757         err = -ENOMEM;
2758
2759         init_waitqueue_head(&drbd_pp_wait);
2760
2761         drbd_proc = NULL; /* play safe for drbd_cleanup */
2762         idr_init(&minors);
2763
2764         err = drbd_create_mempools();
2765         if (err)
2766                 goto fail;
2767
2768         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2769         if (!drbd_proc) {
2770                 printk(KERN_ERR "drbd: unable to register proc file\n");
2771                 goto fail;
2772         }
2773
2774         rwlock_init(&global_state_lock);
2775         INIT_LIST_HEAD(&drbd_tconns);
2776
2777         printk(KERN_INFO "drbd: initialized. "
2778                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2779                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2780         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2781         printk(KERN_INFO "drbd: registered as block device major %d\n",
2782                 DRBD_MAJOR);
2783
2784         return 0; /* Success! */
2785
2786 fail:
2787         drbd_cleanup();
2788         if (err == -ENOMEM)
2789                 /* currently always the case */
2790                 printk(KERN_ERR "drbd: ran out of memory\n");
2791         else
2792                 printk(KERN_ERR "drbd: initialization failure\n");
2793         return err;
2794 }
2795
2796 void drbd_free_bc(struct drbd_backing_dev *ldev)
2797 {
2798         if (ldev == NULL)
2799                 return;
2800
2801         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2802         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2803
2804         kfree(ldev);
2805 }
2806
2807 void drbd_free_sock(struct drbd_tconn *tconn)
2808 {
2809         if (tconn->data.socket) {
2810                 mutex_lock(&tconn->data.mutex);
2811                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2812                 sock_release(tconn->data.socket);
2813                 tconn->data.socket = NULL;
2814                 mutex_unlock(&tconn->data.mutex);
2815         }
2816         if (tconn->meta.socket) {
2817                 mutex_lock(&tconn->meta.mutex);
2818                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2819                 sock_release(tconn->meta.socket);
2820                 tconn->meta.socket = NULL;
2821                 mutex_unlock(&tconn->meta.mutex);
2822         }
2823 }
2824
2825 /* meta data management */
2826
2827 struct meta_data_on_disk {
2828         u64 la_size;           /* last agreed size. */
2829         u64 uuid[UI_SIZE];   /* UUIDs. */
2830         u64 device_uuid;
2831         u64 reserved_u64_1;
2832         u32 flags;             /* MDF */
2833         u32 magic;
2834         u32 md_size_sect;
2835         u32 al_offset;         /* offset to this block */
2836         u32 al_nr_extents;     /* important for restoring the AL */
2837               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2838         u32 bm_offset;         /* offset to the bitmap, from here */
2839         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2840         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2841         u32 reserved_u32[3];
2842
2843 } __packed;
2844
2845 /**
2846  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2847  * @mdev:       DRBD device.
2848  */
2849 void drbd_md_sync(struct drbd_conf *mdev)
2850 {
2851         struct meta_data_on_disk *buffer;
2852         sector_t sector;
2853         int i;
2854
2855         del_timer(&mdev->md_sync_timer);
2856         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2857         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2858                 return;
2859
2860         /* We use here D_FAILED and not D_ATTACHING because we try to write
2861          * metadata even if we detach due to a disk failure! */
2862         if (!get_ldev_if_state(mdev, D_FAILED))
2863                 return;
2864
2865         mutex_lock(&mdev->md_io_mutex);
2866         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2867         memset(buffer, 0, 512);
2868
2869         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2870         for (i = UI_CURRENT; i < UI_SIZE; i++)
2871                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2872         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2873         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2874
2875         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2876         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2877         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2878         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2879         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2880
2881         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2882         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2883
2884         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2885         sector = mdev->ldev->md.md_offset;
2886
2887         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2888                 /* this was a try anyways ... */
2889                 dev_err(DEV, "meta data update failed!\n");
2890                 drbd_chk_io_error(mdev, 1, true);
2891         }
2892
2893         /* Update mdev->ldev->md.la_size_sect,
2894          * since we updated it on metadata. */
2895         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2896
2897         mutex_unlock(&mdev->md_io_mutex);
2898         put_ldev(mdev);
2899 }
2900
2901 /**
2902  * drbd_md_read() - Reads in the meta data super block
2903  * @mdev:       DRBD device.
2904  * @bdev:       Device from which the meta data should be read in.
2905  *
2906  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2907  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2908  */
2909 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2910 {
2911         struct meta_data_on_disk *buffer;
2912         int i, rv = NO_ERROR;
2913
2914         if (!get_ldev_if_state(mdev, D_ATTACHING))
2915                 return ERR_IO_MD_DISK;
2916
2917         mutex_lock(&mdev->md_io_mutex);
2918         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2919
2920         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2921                 /* NOTE: can't do normal error processing here as this is
2922                    called BEFORE disk is attached */
2923                 dev_err(DEV, "Error while reading metadata.\n");
2924                 rv = ERR_IO_MD_DISK;
2925                 goto err;
2926         }
2927
2928         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2929                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2930                 rv = ERR_MD_INVALID;
2931                 goto err;
2932         }
2933         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2934                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2935                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2936                 rv = ERR_MD_INVALID;
2937                 goto err;
2938         }
2939         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2940                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2941                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2942                 rv = ERR_MD_INVALID;
2943                 goto err;
2944         }
2945         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2946                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2947                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2948                 rv = ERR_MD_INVALID;
2949                 goto err;
2950         }
2951
2952         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2953                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2954                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2955                 rv = ERR_MD_INVALID;
2956                 goto err;
2957         }
2958
2959         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2960         for (i = UI_CURRENT; i < UI_SIZE; i++)
2961                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2962         bdev->md.flags = be32_to_cpu(buffer->flags);
2963         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2964
2965         spin_lock_irq(&mdev->tconn->req_lock);
2966         if (mdev->state.conn < C_CONNECTED) {
2967                 int peer;
2968                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2969                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2970                 mdev->peer_max_bio_size = peer;
2971         }
2972         spin_unlock_irq(&mdev->tconn->req_lock);
2973
2974         /* This blocks wants to be get removed... */
2975         bdev->disk_conf->al_extents = be32_to_cpu(buffer->al_nr_extents);
2976         if (bdev->disk_conf->al_extents < DRBD_AL_EXTENTS_MIN)
2977                 bdev->disk_conf->al_extents = DRBD_AL_EXTENTS_DEF;
2978
2979  err:
2980         mutex_unlock(&mdev->md_io_mutex);
2981         put_ldev(mdev);
2982
2983         return rv;
2984 }
2985
2986 /**
2987  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2988  * @mdev:       DRBD device.
2989  *
2990  * Call this function if you change anything that should be written to
2991  * the meta-data super block. This function sets MD_DIRTY, and starts a
2992  * timer that ensures that within five seconds you have to call drbd_md_sync().
2993  */
2994 #ifdef DEBUG
2995 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2996 {
2997         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2998                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2999                 mdev->last_md_mark_dirty.line = line;
3000                 mdev->last_md_mark_dirty.func = func;
3001         }
3002 }
3003 #else
3004 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3005 {
3006         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3007                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3008 }
3009 #endif
3010
3011 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3012 {
3013         int i;
3014
3015         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3016                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3017 }
3018
3019 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3020 {
3021         if (idx == UI_CURRENT) {
3022                 if (mdev->state.role == R_PRIMARY)
3023                         val |= 1;
3024                 else
3025                         val &= ~((u64)1);
3026
3027                 drbd_set_ed_uuid(mdev, val);
3028         }
3029
3030         mdev->ldev->md.uuid[idx] = val;
3031         drbd_md_mark_dirty(mdev);
3032 }
3033
3034
3035 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3036 {
3037         if (mdev->ldev->md.uuid[idx]) {
3038                 drbd_uuid_move_history(mdev);
3039                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3040         }
3041         _drbd_uuid_set(mdev, idx, val);
3042 }
3043
3044 /**
3045  * drbd_uuid_new_current() - Creates a new current UUID
3046  * @mdev:       DRBD device.
3047  *
3048  * Creates a new current UUID, and rotates the old current UUID into
3049  * the bitmap slot. Causes an incremental resync upon next connect.
3050  */
3051 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3052 {
3053         u64 val;
3054         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3055
3056         if (bm_uuid)
3057                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3058
3059         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3060
3061         get_random_bytes(&val, sizeof(u64));
3062         _drbd_uuid_set(mdev, UI_CURRENT, val);
3063         drbd_print_uuids(mdev, "new current UUID");
3064         /* get it to stable storage _now_ */
3065         drbd_md_sync(mdev);
3066 }
3067
3068 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3069 {
3070         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3071                 return;
3072
3073         if (val == 0) {
3074                 drbd_uuid_move_history(mdev);
3075                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3076                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3077         } else {
3078                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3079                 if (bm_uuid)
3080                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3081
3082                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3083         }
3084         drbd_md_mark_dirty(mdev);
3085 }
3086
3087 /**
3088  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3089  * @mdev:       DRBD device.
3090  *
3091  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3092  */
3093 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3094 {
3095         int rv = -EIO;
3096
3097         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3098                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3099                 drbd_md_sync(mdev);
3100                 drbd_bm_set_all(mdev);
3101
3102                 rv = drbd_bm_write(mdev);
3103
3104                 if (!rv) {
3105                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3106                         drbd_md_sync(mdev);
3107                 }
3108
3109                 put_ldev(mdev);
3110         }
3111
3112         return rv;
3113 }
3114
3115 /**
3116  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3117  * @mdev:       DRBD device.
3118  *
3119  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3120  */
3121 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3122 {
3123         int rv = -EIO;
3124
3125         drbd_resume_al(mdev);
3126         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3127                 drbd_bm_clear_all(mdev);
3128                 rv = drbd_bm_write(mdev);
3129                 put_ldev(mdev);
3130         }
3131
3132         return rv;
3133 }
3134
3135 static int w_bitmap_io(struct drbd_work *w, int unused)
3136 {
3137         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3138         struct drbd_conf *mdev = w->mdev;
3139         int rv = -EIO;
3140
3141         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3142
3143         if (get_ldev(mdev)) {
3144                 drbd_bm_lock(mdev, work->why, work->flags);
3145                 rv = work->io_fn(mdev);
3146                 drbd_bm_unlock(mdev);
3147                 put_ldev(mdev);
3148         }
3149
3150         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3151         wake_up(&mdev->misc_wait);
3152
3153         if (work->done)
3154                 work->done(mdev, rv);
3155
3156         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3157         work->why = NULL;
3158         work->flags = 0;
3159
3160         return 0;
3161 }
3162
3163 void drbd_ldev_destroy(struct drbd_conf *mdev)
3164 {
3165         lc_destroy(mdev->resync);
3166         mdev->resync = NULL;
3167         lc_destroy(mdev->act_log);
3168         mdev->act_log = NULL;
3169         __no_warn(local,
3170                 drbd_free_bc(mdev->ldev);
3171                 mdev->ldev = NULL;);
3172
3173         clear_bit(GO_DISKLESS, &mdev->flags);
3174 }
3175
3176 static int w_go_diskless(struct drbd_work *w, int unused)
3177 {
3178         struct drbd_conf *mdev = w->mdev;
3179
3180         D_ASSERT(mdev->state.disk == D_FAILED);
3181         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3182          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3183          * the protected members anymore, though, so once put_ldev reaches zero
3184          * again, it will be safe to free them. */
3185         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3186         return 0;
3187 }
3188
3189 void drbd_go_diskless(struct drbd_conf *mdev)
3190 {
3191         D_ASSERT(mdev->state.disk == D_FAILED);
3192         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3193                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3194 }
3195
3196 /**
3197  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3198  * @mdev:       DRBD device.
3199  * @io_fn:      IO callback to be called when bitmap IO is possible
3200  * @done:       callback to be called after the bitmap IO was performed
3201  * @why:        Descriptive text of the reason for doing the IO
3202  *
3203  * While IO on the bitmap happens we freeze application IO thus we ensure
3204  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3205  * called from worker context. It MUST NOT be used while a previous such
3206  * work is still pending!
3207  */
3208 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3209                           int (*io_fn)(struct drbd_conf *),
3210                           void (*done)(struct drbd_conf *, int),
3211                           char *why, enum bm_flag flags)
3212 {
3213         D_ASSERT(current == mdev->tconn->worker.task);
3214
3215         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3216         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3217         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3218         if (mdev->bm_io_work.why)
3219                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3220                         why, mdev->bm_io_work.why);
3221
3222         mdev->bm_io_work.io_fn = io_fn;
3223         mdev->bm_io_work.done = done;
3224         mdev->bm_io_work.why = why;
3225         mdev->bm_io_work.flags = flags;
3226
3227         spin_lock_irq(&mdev->tconn->req_lock);
3228         set_bit(BITMAP_IO, &mdev->flags);
3229         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3230                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3231                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3232         }
3233         spin_unlock_irq(&mdev->tconn->req_lock);
3234 }
3235
3236 /**
3237  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3238  * @mdev:       DRBD device.
3239  * @io_fn:      IO callback to be called when bitmap IO is possible
3240  * @why:        Descriptive text of the reason for doing the IO
3241  *
3242  * freezes application IO while that the actual IO operations runs. This
3243  * functions MAY NOT be called from worker context.
3244  */
3245 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3246                 char *why, enum bm_flag flags)
3247 {
3248         int rv;
3249
3250         D_ASSERT(current != mdev->tconn->worker.task);
3251
3252         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3253                 drbd_suspend_io(mdev);
3254
3255         drbd_bm_lock(mdev, why, flags);
3256         rv = io_fn(mdev);
3257         drbd_bm_unlock(mdev);
3258
3259         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3260                 drbd_resume_io(mdev);
3261
3262         return rv;
3263 }
3264
3265 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3266 {
3267         if ((mdev->ldev->md.flags & flag) != flag) {
3268                 drbd_md_mark_dirty(mdev);
3269                 mdev->ldev->md.flags |= flag;
3270         }
3271 }
3272
3273 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3274 {
3275         if ((mdev->ldev->md.flags & flag) != 0) {
3276                 drbd_md_mark_dirty(mdev);
3277                 mdev->ldev->md.flags &= ~flag;
3278         }
3279 }
3280 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3281 {
3282         return (bdev->md.flags & flag) != 0;
3283 }
3284
3285 static void md_sync_timer_fn(unsigned long data)
3286 {
3287         struct drbd_conf *mdev = (struct drbd_conf *) data;
3288
3289         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3290 }
3291
3292 static int w_md_sync(struct drbd_work *w, int unused)
3293 {
3294         struct drbd_conf *mdev = w->mdev;
3295
3296         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3297 #ifdef DEBUG
3298         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3299                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3300 #endif
3301         drbd_md_sync(mdev);
3302         return 0;
3303 }
3304
3305 const char *cmdname(enum drbd_packet cmd)
3306 {
3307         /* THINK may need to become several global tables
3308          * when we want to support more than
3309          * one PRO_VERSION */
3310         static const char *cmdnames[] = {
3311                 [P_DATA]                = "Data",
3312                 [P_DATA_REPLY]          = "DataReply",
3313                 [P_RS_DATA_REPLY]       = "RSDataReply",
3314                 [P_BARRIER]             = "Barrier",
3315                 [P_BITMAP]              = "ReportBitMap",
3316                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3317                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3318                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3319                 [P_DATA_REQUEST]        = "DataRequest",
3320                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3321                 [P_SYNC_PARAM]          = "SyncParam",
3322                 [P_SYNC_PARAM89]        = "SyncParam89",
3323                 [P_PROTOCOL]            = "ReportProtocol",
3324                 [P_UUIDS]               = "ReportUUIDs",
3325                 [P_SIZES]               = "ReportSizes",
3326                 [P_STATE]               = "ReportState",
3327                 [P_SYNC_UUID]           = "ReportSyncUUID",
3328                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3329                 [P_AUTH_RESPONSE]       = "AuthResponse",
3330                 [P_PING]                = "Ping",
3331                 [P_PING_ACK]            = "PingAck",
3332                 [P_RECV_ACK]            = "RecvAck",
3333                 [P_WRITE_ACK]           = "WriteAck",
3334                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3335                 [P_DISCARD_WRITE]        = "DiscardWrite",
3336                 [P_NEG_ACK]             = "NegAck",
3337                 [P_NEG_DREPLY]          = "NegDReply",
3338                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3339                 [P_BARRIER_ACK]         = "BarrierAck",
3340                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3341                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3342                 [P_OV_REQUEST]          = "OVRequest",
3343                 [P_OV_REPLY]            = "OVReply",
3344                 [P_OV_RESULT]           = "OVResult",
3345                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3346                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3347                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3348                 [P_DELAY_PROBE]         = "DelayProbe",
3349                 [P_OUT_OF_SYNC]         = "OutOfSync",
3350                 [P_RETRY_WRITE]         = "RetryWrite",
3351                 [P_RS_CANCEL]           = "RSCancel",
3352                 [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3353                 [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3354                 [P_RETRY_WRITE]         = "retry_write",
3355                 [P_PROTOCOL_UPDATE]     = "protocol_update",
3356
3357                 /* enum drbd_packet, but not commands - obsoleted flags:
3358                  *      P_MAY_IGNORE
3359                  *      P_MAX_OPT_CMD
3360                  */
3361         };
3362
3363         /* too big for the array: 0xfffX */
3364         if (cmd == P_INITIAL_META)
3365                 return "InitialMeta";
3366         if (cmd == P_INITIAL_DATA)
3367                 return "InitialData";
3368         if (cmd == P_CONNECTION_FEATURES)
3369                 return "ConnectionFeatures";
3370         if (cmd >= ARRAY_SIZE(cmdnames))
3371                 return "Unknown";
3372         return cmdnames[cmd];
3373 }
3374
3375 /**
3376  * drbd_wait_misc  -  wait for a request to make progress
3377  * @mdev:       device associated with the request
3378  * @i:          the struct drbd_interval embedded in struct drbd_request or
3379  *              struct drbd_peer_request
3380  */
3381 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3382 {
3383         struct net_conf *nc;
3384         DEFINE_WAIT(wait);
3385         long timeout;
3386
3387         rcu_read_lock();
3388         nc = rcu_dereference(mdev->tconn->net_conf);
3389         if (!nc) {
3390                 rcu_read_unlock();
3391                 return -ETIMEDOUT;
3392         }
3393         timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3394         rcu_read_unlock();
3395
3396         /* Indicate to wake up mdev->misc_wait on progress.  */
3397         i->waiting = true;
3398         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3399         spin_unlock_irq(&mdev->tconn->req_lock);
3400         timeout = schedule_timeout(timeout);
3401         finish_wait(&mdev->misc_wait, &wait);
3402         spin_lock_irq(&mdev->tconn->req_lock);
3403         if (!timeout || mdev->state.conn < C_CONNECTED)
3404                 return -ETIMEDOUT;
3405         if (signal_pending(current))
3406                 return -ERESTARTSYS;
3407         return 0;
3408 }
3409
3410 #ifdef CONFIG_DRBD_FAULT_INJECTION
3411 /* Fault insertion support including random number generator shamelessly
3412  * stolen from kernel/rcutorture.c */
3413 struct fault_random_state {
3414         unsigned long state;
3415         unsigned long count;
3416 };
3417
3418 #define FAULT_RANDOM_MULT 39916801  /* prime */
3419 #define FAULT_RANDOM_ADD        479001701 /* prime */
3420 #define FAULT_RANDOM_REFRESH 10000
3421
3422 /*
3423  * Crude but fast random-number generator.  Uses a linear congruential
3424  * generator, with occasional help from get_random_bytes().
3425  */
3426 static unsigned long
3427 _drbd_fault_random(struct fault_random_state *rsp)
3428 {
3429         long refresh;
3430
3431         if (!rsp->count--) {
3432                 get_random_bytes(&refresh, sizeof(refresh));
3433                 rsp->state += refresh;
3434                 rsp->count = FAULT_RANDOM_REFRESH;
3435         }
3436         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3437         return swahw32(rsp->state);
3438 }
3439
3440 static char *
3441 _drbd_fault_str(unsigned int type) {
3442         static char *_faults[] = {
3443                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3444                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3445                 [DRBD_FAULT_RS_WR] = "Resync write",
3446                 [DRBD_FAULT_RS_RD] = "Resync read",
3447                 [DRBD_FAULT_DT_WR] = "Data write",
3448                 [DRBD_FAULT_DT_RD] = "Data read",
3449                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3450                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3451                 [DRBD_FAULT_AL_EE] = "EE allocation",
3452                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3453         };
3454
3455         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3456 }
3457
3458 unsigned int
3459 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3460 {
3461         static struct fault_random_state rrs = {0, 0};
3462
3463         unsigned int ret = (
3464                 (fault_devs == 0 ||
3465                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3466                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3467
3468         if (ret) {
3469                 fault_count++;
3470
3471                 if (__ratelimit(&drbd_ratelimit_state))
3472                         dev_warn(DEV, "***Simulating %s failure\n",
3473                                 _drbd_fault_str(type));
3474         }
3475
3476         return ret;
3477 }
3478 #endif
3479
3480 const char *drbd_buildtag(void)
3481 {
3482         /* DRBD built from external sources has here a reference to the
3483            git hash of the source code. */
3484
3485         static char buildtag[38] = "\0uilt-in";
3486
3487         if (buildtag[0] == 0) {
3488 #ifdef CONFIG_MODULES
3489                 if (THIS_MODULE != NULL)
3490                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3491                 else
3492 #endif
3493                         buildtag[0] = 'b';
3494         }
3495
3496         return buildtag;
3497 }
3498
3499 module_init(drbd_init)
3500 module_exit(drbd_cleanup)
3501
3502 EXPORT_SYMBOL(drbd_conn_str);
3503 EXPORT_SYMBOL(drbd_role_str);
3504 EXPORT_SYMBOL(drbd_disk_str);
3505 EXPORT_SYMBOL(drbd_set_st_err_str);