79f275dc43a4f5231d13f56534a5dbbfe20825db
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218
219         return 1;
220 }
221
222 static void tl_cleanup(struct drbd_tconn *tconn)
223 {
224         if (tconn->oldest_tle != tconn->newest_tle)
225                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
226         if (!list_empty(&tconn->out_of_sequence_requests))
227                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
228         kfree(tconn->oldest_tle);
229         tconn->oldest_tle = NULL;
230         kfree(tconn->unused_spare_tle);
231         tconn->unused_spare_tle = NULL;
232 }
233
234 /**
235  * _tl_add_barrier() - Adds a barrier to the transfer log
236  * @mdev:       DRBD device.
237  * @new:        Barrier to be added before the current head of the TL.
238  *
239  * The caller must hold the req_lock.
240  */
241 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
242 {
243         struct drbd_tl_epoch *newest_before;
244
245         INIT_LIST_HEAD(&new->requests);
246         INIT_LIST_HEAD(&new->w.list);
247         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
248         new->next = NULL;
249         new->n_writes = 0;
250
251         newest_before = tconn->newest_tle;
252         /* never send a barrier number == 0, because that is special-cased
253          * when using TCQ for our write ordering code */
254         new->br_number = (newest_before->br_number+1) ?: 1;
255         if (tconn->newest_tle != new) {
256                 tconn->newest_tle->next = new;
257                 tconn->newest_tle = new;
258         }
259 }
260
261 /**
262  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
263  * @mdev:       DRBD device.
264  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
265  * @set_size:   Expected number of requests before that barrier.
266  *
267  * In case the passed barrier_nr or set_size does not match the oldest
268  * &struct drbd_tl_epoch objects this function will cause a termination
269  * of the connection.
270  */
271 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
272                 unsigned int set_size)
273 {
274         struct drbd_conf *mdev;
275         struct drbd_tl_epoch *b, *nob; /* next old barrier */
276         struct list_head *le, *tle;
277         struct drbd_request *r;
278
279         spin_lock_irq(&tconn->req_lock);
280
281         b = tconn->oldest_tle;
282
283         /* first some paranoia code */
284         if (b == NULL) {
285                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
286                          barrier_nr);
287                 goto bail;
288         }
289         if (b->br_number != barrier_nr) {
290                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
291                          barrier_nr, b->br_number);
292                 goto bail;
293         }
294         if (b->n_writes != set_size) {
295                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
296                          barrier_nr, set_size, b->n_writes);
297                 goto bail;
298         }
299
300         /* Clean up list of requests processed during current epoch */
301         list_for_each_safe(le, tle, &b->requests) {
302                 r = list_entry(le, struct drbd_request, tl_requests);
303                 _req_mod(r, BARRIER_ACKED);
304         }
305         /* There could be requests on the list waiting for completion
306            of the write to the local disk. To avoid corruptions of
307            slab's data structures we have to remove the lists head.
308
309            Also there could have been a barrier ack out of sequence, overtaking
310            the write acks - which would be a bug and violating write ordering.
311            To not deadlock in case we lose connection while such requests are
312            still pending, we need some way to find them for the
313            _req_mode(CONNECTION_LOST_WHILE_PENDING).
314
315            These have been list_move'd to the out_of_sequence_requests list in
316            _req_mod(, BARRIER_ACKED) above.
317            */
318         list_del_init(&b->requests);
319         mdev = b->w.mdev;
320
321         nob = b->next;
322         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
323                 _tl_add_barrier(tconn, b);
324                 if (nob)
325                         tconn->oldest_tle = nob;
326                 /* if nob == NULL b was the only barrier, and becomes the new
327                    barrier. Therefore tconn->oldest_tle points already to b */
328         } else {
329                 D_ASSERT(nob != NULL);
330                 tconn->oldest_tle = nob;
331                 kfree(b);
332         }
333
334         spin_unlock_irq(&tconn->req_lock);
335         dec_ap_pending(mdev);
336
337         return;
338
339 bail:
340         spin_unlock_irq(&tconn->req_lock);
341         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
342 }
343
344
345 /**
346  * _tl_restart() - Walks the transfer log, and applies an action to all requests
347  * @mdev:       DRBD device.
348  * @what:       The action/event to perform with all request objects
349  *
350  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
351  * RESTART_FROZEN_DISK_IO.
352  */
353 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
354 {
355         struct drbd_tl_epoch *b, *tmp, **pn;
356         struct list_head *le, *tle, carry_reads;
357         struct drbd_request *req;
358         int rv, n_writes, n_reads;
359
360         b = tconn->oldest_tle;
361         pn = &tconn->oldest_tle;
362         while (b) {
363                 n_writes = 0;
364                 n_reads = 0;
365                 INIT_LIST_HEAD(&carry_reads);
366                 list_for_each_safe(le, tle, &b->requests) {
367                         req = list_entry(le, struct drbd_request, tl_requests);
368                         rv = _req_mod(req, what);
369
370                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
371                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
372                 }
373                 tmp = b->next;
374
375                 if (n_writes) {
376                         if (what == RESEND) {
377                                 b->n_writes = n_writes;
378                                 if (b->w.cb == NULL) {
379                                         b->w.cb = w_send_barrier;
380                                         inc_ap_pending(b->w.mdev);
381                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
382                                 }
383
384                                 drbd_queue_work(&tconn->data.work, &b->w);
385                         }
386                         pn = &b->next;
387                 } else {
388                         if (n_reads)
389                                 list_add(&carry_reads, &b->requests);
390                         /* there could still be requests on that ring list,
391                          * in case local io is still pending */
392                         list_del(&b->requests);
393
394                         /* dec_ap_pending corresponding to queue_barrier.
395                          * the newest barrier may not have been queued yet,
396                          * in which case w.cb is still NULL. */
397                         if (b->w.cb != NULL)
398                                 dec_ap_pending(b->w.mdev);
399
400                         if (b == tconn->newest_tle) {
401                                 /* recycle, but reinit! */
402                                 if (tmp != NULL)
403                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
404                                 INIT_LIST_HEAD(&b->requests);
405                                 list_splice(&carry_reads, &b->requests);
406                                 INIT_LIST_HEAD(&b->w.list);
407                                 b->w.cb = NULL;
408                                 b->br_number = net_random();
409                                 b->n_writes = 0;
410
411                                 *pn = b;
412                                 break;
413                         }
414                         *pn = tmp;
415                         kfree(b);
416                 }
417                 b = tmp;
418                 list_splice(&carry_reads, &b->requests);
419         }
420 }
421
422
423 /**
424  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
425  * @mdev:       DRBD device.
426  *
427  * This is called after the connection to the peer was lost. The storage covered
428  * by the requests on the transfer gets marked as our of sync. Called from the
429  * receiver thread and the worker thread.
430  */
431 void tl_clear(struct drbd_tconn *tconn)
432 {
433         struct drbd_conf *mdev;
434         struct list_head *le, *tle;
435         struct drbd_request *r;
436         int vnr;
437
438         spin_lock_irq(&tconn->req_lock);
439
440         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
441
442         /* we expect this list to be empty. */
443         if (!list_empty(&tconn->out_of_sequence_requests))
444                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
445
446         /* but just in case, clean it up anyways! */
447         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
448                 r = list_entry(le, struct drbd_request, tl_requests);
449                 /* It would be nice to complete outside of spinlock.
450                  * But this is easier for now. */
451                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
452         }
453
454         /* ensure bit indicating barrier is required is clear */
455         rcu_read_lock();
456         idr_for_each_entry(&tconn->volumes, mdev, vnr)
457                 clear_bit(CREATE_BARRIER, &mdev->flags);
458         rcu_read_unlock();
459
460         spin_unlock_irq(&tconn->req_lock);
461 }
462
463 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
464 {
465         spin_lock_irq(&tconn->req_lock);
466         _tl_restart(tconn, what);
467         spin_unlock_irq(&tconn->req_lock);
468 }
469
470 static int drbd_thread_setup(void *arg)
471 {
472         struct drbd_thread *thi = (struct drbd_thread *) arg;
473         struct drbd_tconn *tconn = thi->tconn;
474         unsigned long flags;
475         int retval;
476
477         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
478                  thi->name[0], thi->tconn->name);
479
480 restart:
481         retval = thi->function(thi);
482
483         spin_lock_irqsave(&thi->t_lock, flags);
484
485         /* if the receiver has been "EXITING", the last thing it did
486          * was set the conn state to "StandAlone",
487          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
488          * and receiver thread will be "started".
489          * drbd_thread_start needs to set "RESTARTING" in that case.
490          * t_state check and assignment needs to be within the same spinlock,
491          * so either thread_start sees EXITING, and can remap to RESTARTING,
492          * or thread_start see NONE, and can proceed as normal.
493          */
494
495         if (thi->t_state == RESTARTING) {
496                 conn_info(tconn, "Restarting %s thread\n", thi->name);
497                 thi->t_state = RUNNING;
498                 spin_unlock_irqrestore(&thi->t_lock, flags);
499                 goto restart;
500         }
501
502         thi->task = NULL;
503         thi->t_state = NONE;
504         smp_mb();
505         complete_all(&thi->stop);
506         spin_unlock_irqrestore(&thi->t_lock, flags);
507
508         conn_info(tconn, "Terminating %s\n", current->comm);
509
510         /* Release mod reference taken when thread was started */
511
512         kref_put(&tconn->kref, &conn_destroy);
513         module_put(THIS_MODULE);
514         return retval;
515 }
516
517 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
518                              int (*func) (struct drbd_thread *), char *name)
519 {
520         spin_lock_init(&thi->t_lock);
521         thi->task    = NULL;
522         thi->t_state = NONE;
523         thi->function = func;
524         thi->tconn = tconn;
525         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
526 }
527
528 int drbd_thread_start(struct drbd_thread *thi)
529 {
530         struct drbd_tconn *tconn = thi->tconn;
531         struct task_struct *nt;
532         unsigned long flags;
533
534         /* is used from state engine doing drbd_thread_stop_nowait,
535          * while holding the req lock irqsave */
536         spin_lock_irqsave(&thi->t_lock, flags);
537
538         switch (thi->t_state) {
539         case NONE:
540                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
541                          thi->name, current->comm, current->pid);
542
543                 /* Get ref on module for thread - this is released when thread exits */
544                 if (!try_module_get(THIS_MODULE)) {
545                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
546                         spin_unlock_irqrestore(&thi->t_lock, flags);
547                         return false;
548                 }
549
550                 kref_get(&thi->tconn->kref);
551
552                 init_completion(&thi->stop);
553                 thi->reset_cpu_mask = 1;
554                 thi->t_state = RUNNING;
555                 spin_unlock_irqrestore(&thi->t_lock, flags);
556                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
557
558                 nt = kthread_create(drbd_thread_setup, (void *) thi,
559                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
560
561                 if (IS_ERR(nt)) {
562                         conn_err(tconn, "Couldn't start thread\n");
563
564                         kref_put(&tconn->kref, &conn_destroy);
565                         module_put(THIS_MODULE);
566                         return false;
567                 }
568                 spin_lock_irqsave(&thi->t_lock, flags);
569                 thi->task = nt;
570                 thi->t_state = RUNNING;
571                 spin_unlock_irqrestore(&thi->t_lock, flags);
572                 wake_up_process(nt);
573                 break;
574         case EXITING:
575                 thi->t_state = RESTARTING;
576                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
577                                 thi->name, current->comm, current->pid);
578                 /* fall through */
579         case RUNNING:
580         case RESTARTING:
581         default:
582                 spin_unlock_irqrestore(&thi->t_lock, flags);
583                 break;
584         }
585
586         return true;
587 }
588
589
590 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
591 {
592         unsigned long flags;
593
594         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
595
596         /* may be called from state engine, holding the req lock irqsave */
597         spin_lock_irqsave(&thi->t_lock, flags);
598
599         if (thi->t_state == NONE) {
600                 spin_unlock_irqrestore(&thi->t_lock, flags);
601                 if (restart)
602                         drbd_thread_start(thi);
603                 return;
604         }
605
606         if (thi->t_state != ns) {
607                 if (thi->task == NULL) {
608                         spin_unlock_irqrestore(&thi->t_lock, flags);
609                         return;
610                 }
611
612                 thi->t_state = ns;
613                 smp_mb();
614                 init_completion(&thi->stop);
615                 if (thi->task != current)
616                         force_sig(DRBD_SIGKILL, thi->task);
617         }
618
619         spin_unlock_irqrestore(&thi->t_lock, flags);
620
621         if (wait)
622                 wait_for_completion(&thi->stop);
623 }
624
625 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
626 {
627         struct drbd_thread *thi =
628                 task == tconn->receiver.task ? &tconn->receiver :
629                 task == tconn->asender.task  ? &tconn->asender :
630                 task == tconn->worker.task   ? &tconn->worker : NULL;
631
632         return thi;
633 }
634
635 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
636 {
637         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
638         return thi ? thi->name : task->comm;
639 }
640
641 int conn_lowest_minor(struct drbd_tconn *tconn)
642 {
643         struct drbd_conf *mdev;
644         int vnr = 0, m;
645
646         rcu_read_lock();
647         mdev = idr_get_next(&tconn->volumes, &vnr);
648         m = mdev ? mdev_to_minor(mdev) : -1;
649         rcu_read_unlock();
650
651         return m;
652 }
653
654 #ifdef CONFIG_SMP
655 /**
656  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
657  * @mdev:       DRBD device.
658  *
659  * Forces all threads of a device onto the same CPU. This is beneficial for
660  * DRBD's performance. May be overwritten by user's configuration.
661  */
662 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
663 {
664         int ord, cpu;
665
666         /* user override. */
667         if (cpumask_weight(tconn->cpu_mask))
668                 return;
669
670         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
671         for_each_online_cpu(cpu) {
672                 if (ord-- == 0) {
673                         cpumask_set_cpu(cpu, tconn->cpu_mask);
674                         return;
675                 }
676         }
677         /* should not be reached */
678         cpumask_setall(tconn->cpu_mask);
679 }
680
681 /**
682  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
683  * @mdev:       DRBD device.
684  * @thi:        drbd_thread object
685  *
686  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
687  * prematurely.
688  */
689 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
690 {
691         struct task_struct *p = current;
692
693         if (!thi->reset_cpu_mask)
694                 return;
695         thi->reset_cpu_mask = 0;
696         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
697 }
698 #endif
699
700 /**
701  * drbd_header_size  -  size of a packet header
702  *
703  * The header size is a multiple of 8, so any payload following the header is
704  * word aligned on 64-bit architectures.  (The bitmap send and receive code
705  * relies on this.)
706  */
707 unsigned int drbd_header_size(struct drbd_tconn *tconn)
708 {
709         if (tconn->agreed_pro_version >= 100) {
710                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
711                 return sizeof(struct p_header100);
712         } else {
713                 BUILD_BUG_ON(sizeof(struct p_header80) !=
714                              sizeof(struct p_header95));
715                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
716                 return sizeof(struct p_header80);
717         }
718 }
719
720 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
721 {
722         h->magic   = cpu_to_be32(DRBD_MAGIC);
723         h->command = cpu_to_be16(cmd);
724         h->length  = cpu_to_be16(size);
725         return sizeof(struct p_header80);
726 }
727
728 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
729 {
730         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
731         h->command = cpu_to_be16(cmd);
732         h->length = cpu_to_be32(size);
733         return sizeof(struct p_header95);
734 }
735
736 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
737                                       int size, int vnr)
738 {
739         h->magic = cpu_to_be32(DRBD_MAGIC_100);
740         h->volume = cpu_to_be16(vnr);
741         h->command = cpu_to_be16(cmd);
742         h->length = cpu_to_be32(size);
743         h->pad = 0;
744         return sizeof(struct p_header100);
745 }
746
747 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
748                                    void *buffer, enum drbd_packet cmd, int size)
749 {
750         if (tconn->agreed_pro_version >= 100)
751                 return prepare_header100(buffer, cmd, size, vnr);
752         else if (tconn->agreed_pro_version >= 95 &&
753                  size > DRBD_MAX_SIZE_H80_PACKET)
754                 return prepare_header95(buffer, cmd, size);
755         else
756                 return prepare_header80(buffer, cmd, size);
757 }
758
759 static void *__conn_prepare_command(struct drbd_tconn *tconn,
760                                     struct drbd_socket *sock)
761 {
762         if (!sock->socket)
763                 return NULL;
764         return sock->sbuf + drbd_header_size(tconn);
765 }
766
767 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
768 {
769         void *p;
770
771         mutex_lock(&sock->mutex);
772         p = __conn_prepare_command(tconn, sock);
773         if (!p)
774                 mutex_unlock(&sock->mutex);
775
776         return p;
777 }
778
779 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
780 {
781         return conn_prepare_command(mdev->tconn, sock);
782 }
783
784 static int __send_command(struct drbd_tconn *tconn, int vnr,
785                           struct drbd_socket *sock, enum drbd_packet cmd,
786                           unsigned int header_size, void *data,
787                           unsigned int size)
788 {
789         int msg_flags;
790         int err;
791
792         /*
793          * Called with @data == NULL and the size of the data blocks in @size
794          * for commands that send data blocks.  For those commands, omit the
795          * MSG_MORE flag: this will increase the likelihood that data blocks
796          * which are page aligned on the sender will end up page aligned on the
797          * receiver.
798          */
799         msg_flags = data ? MSG_MORE : 0;
800
801         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
802                                       header_size + size);
803         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
804                             msg_flags);
805         if (data && !err)
806                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
807         return err;
808 }
809
810 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
811                                enum drbd_packet cmd, unsigned int header_size,
812                                void *data, unsigned int size)
813 {
814         return __send_command(tconn, 0, sock, cmd, header_size, data, size);
815 }
816
817 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
818                       enum drbd_packet cmd, unsigned int header_size,
819                       void *data, unsigned int size)
820 {
821         int err;
822
823         err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
824         mutex_unlock(&sock->mutex);
825         return err;
826 }
827
828 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
829                       enum drbd_packet cmd, unsigned int header_size,
830                       void *data, unsigned int size)
831 {
832         int err;
833
834         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
835                              data, size);
836         mutex_unlock(&sock->mutex);
837         return err;
838 }
839
840 int drbd_send_ping(struct drbd_tconn *tconn)
841 {
842         struct drbd_socket *sock;
843
844         sock = &tconn->meta;
845         if (!conn_prepare_command(tconn, sock))
846                 return -EIO;
847         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
848 }
849
850 int drbd_send_ping_ack(struct drbd_tconn *tconn)
851 {
852         struct drbd_socket *sock;
853
854         sock = &tconn->meta;
855         if (!conn_prepare_command(tconn, sock))
856                 return -EIO;
857         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
858 }
859
860 int drbd_send_sync_param(struct drbd_conf *mdev)
861 {
862         struct drbd_socket *sock;
863         struct p_rs_param_95 *p;
864         int size;
865         const int apv = mdev->tconn->agreed_pro_version;
866         enum drbd_packet cmd;
867         struct net_conf *nc;
868         struct disk_conf *dc;
869
870         sock = &mdev->tconn->data;
871         p = drbd_prepare_command(mdev, sock);
872         if (!p)
873                 return -EIO;
874
875         rcu_read_lock();
876         nc = rcu_dereference(mdev->tconn->net_conf);
877
878         size = apv <= 87 ? sizeof(struct p_rs_param)
879                 : apv == 88 ? sizeof(struct p_rs_param)
880                         + strlen(nc->verify_alg) + 1
881                 : apv <= 94 ? sizeof(struct p_rs_param_89)
882                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
883
884         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
885
886         /* initialize verify_alg and csums_alg */
887         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
888
889         if (get_ldev(mdev)) {
890                 dc = rcu_dereference(mdev->ldev->disk_conf);
891                 p->resync_rate = cpu_to_be32(dc->resync_rate);
892                 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
893                 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
894                 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
895                 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
896                 put_ldev(mdev);
897         } else {
898                 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
899                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
900                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
901                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
902                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
903         }
904
905         if (apv >= 88)
906                 strcpy(p->verify_alg, nc->verify_alg);
907         if (apv >= 89)
908                 strcpy(p->csums_alg, nc->csums_alg);
909         rcu_read_unlock();
910
911         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
912 }
913
914 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
915 {
916         struct drbd_socket *sock;
917         struct p_protocol *p;
918         struct net_conf *nc;
919         int size, cf;
920
921         sock = &tconn->data;
922         p = __conn_prepare_command(tconn, sock);
923         if (!p)
924                 return -EIO;
925
926         rcu_read_lock();
927         nc = rcu_dereference(tconn->net_conf);
928
929         if (nc->dry_run && tconn->agreed_pro_version < 92) {
930                 rcu_read_unlock();
931                 mutex_unlock(&sock->mutex);
932                 conn_err(tconn, "--dry-run is not supported by peer");
933                 return -EOPNOTSUPP;
934         }
935
936         size = sizeof(*p);
937         if (tconn->agreed_pro_version >= 87)
938                 size += strlen(nc->integrity_alg) + 1;
939
940         p->protocol      = cpu_to_be32(nc->wire_protocol);
941         p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
942         p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
943         p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
944         p->two_primaries = cpu_to_be32(nc->two_primaries);
945         cf = 0;
946         if (nc->discard_my_data)
947                 cf |= CF_DISCARD_MY_DATA;
948         if (nc->dry_run)
949                 cf |= CF_DRY_RUN;
950         p->conn_flags    = cpu_to_be32(cf);
951
952         if (tconn->agreed_pro_version >= 87)
953                 strcpy(p->integrity_alg, nc->integrity_alg);
954         rcu_read_unlock();
955
956         return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
957 }
958
959 int drbd_send_protocol(struct drbd_tconn *tconn)
960 {
961         int err;
962
963         mutex_lock(&tconn->data.mutex);
964         err = __drbd_send_protocol(tconn, P_PROTOCOL);
965         mutex_unlock(&tconn->data.mutex);
966
967         return err;
968 }
969
970 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
971 {
972         struct drbd_socket *sock;
973         struct p_uuids *p;
974         int i;
975
976         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
977                 return 0;
978
979         sock = &mdev->tconn->data;
980         p = drbd_prepare_command(mdev, sock);
981         if (!p) {
982                 put_ldev(mdev);
983                 return -EIO;
984         }
985         for (i = UI_CURRENT; i < UI_SIZE; i++)
986                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
987
988         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
989         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
990         rcu_read_lock();
991         uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
992         rcu_read_unlock();
993         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
994         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
995         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
996
997         put_ldev(mdev);
998         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
999 }
1000
1001 int drbd_send_uuids(struct drbd_conf *mdev)
1002 {
1003         return _drbd_send_uuids(mdev, 0);
1004 }
1005
1006 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1007 {
1008         return _drbd_send_uuids(mdev, 8);
1009 }
1010
1011 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1012 {
1013         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1014                 u64 *uuid = mdev->ldev->md.uuid;
1015                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1016                      text,
1017                      (unsigned long long)uuid[UI_CURRENT],
1018                      (unsigned long long)uuid[UI_BITMAP],
1019                      (unsigned long long)uuid[UI_HISTORY_START],
1020                      (unsigned long long)uuid[UI_HISTORY_END]);
1021                 put_ldev(mdev);
1022         } else {
1023                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1024                                 text,
1025                                 (unsigned long long)mdev->ed_uuid);
1026         }
1027 }
1028
1029 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1030 {
1031         struct drbd_socket *sock;
1032         struct p_rs_uuid *p;
1033         u64 uuid;
1034
1035         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1036
1037         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1038         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1039         drbd_print_uuids(mdev, "updated sync UUID");
1040         drbd_md_sync(mdev);
1041
1042         sock = &mdev->tconn->data;
1043         p = drbd_prepare_command(mdev, sock);
1044         if (p) {
1045                 p->uuid = cpu_to_be64(uuid);
1046                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1047         }
1048 }
1049
1050 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1051 {
1052         struct drbd_socket *sock;
1053         struct p_sizes *p;
1054         sector_t d_size, u_size;
1055         int q_order_type, max_bio_size;
1056
1057         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1058                 D_ASSERT(mdev->ldev->backing_bdev);
1059                 d_size = drbd_get_max_capacity(mdev->ldev);
1060                 rcu_read_lock();
1061                 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1062                 rcu_read_unlock();
1063                 q_order_type = drbd_queue_order_type(mdev);
1064                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1065                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1066                 put_ldev(mdev);
1067         } else {
1068                 d_size = 0;
1069                 u_size = 0;
1070                 q_order_type = QUEUE_ORDERED_NONE;
1071                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1072         }
1073
1074         sock = &mdev->tconn->data;
1075         p = drbd_prepare_command(mdev, sock);
1076         if (!p)
1077                 return -EIO;
1078         p->d_size = cpu_to_be64(d_size);
1079         p->u_size = cpu_to_be64(u_size);
1080         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1081         p->max_bio_size = cpu_to_be32(max_bio_size);
1082         p->queue_order_type = cpu_to_be16(q_order_type);
1083         p->dds_flags = cpu_to_be16(flags);
1084         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1085 }
1086
1087 /**
1088  * drbd_send_state() - Sends the drbd state to the peer
1089  * @mdev:       DRBD device.
1090  */
1091 int drbd_send_state(struct drbd_conf *mdev)
1092 {
1093         struct drbd_socket *sock;
1094         struct p_state *p;
1095
1096         sock = &mdev->tconn->data;
1097         p = drbd_prepare_command(mdev, sock);
1098         if (!p)
1099                 return -EIO;
1100         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1101         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1102 }
1103
1104 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1105 {
1106         struct drbd_socket *sock;
1107         struct p_req_state *p;
1108
1109         sock = &mdev->tconn->data;
1110         p = drbd_prepare_command(mdev, sock);
1111         if (!p)
1112                 return -EIO;
1113         p->mask = cpu_to_be32(mask.i);
1114         p->val = cpu_to_be32(val.i);
1115         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1116
1117 }
1118
1119 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1120 {
1121         enum drbd_packet cmd;
1122         struct drbd_socket *sock;
1123         struct p_req_state *p;
1124
1125         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1126         sock = &tconn->data;
1127         p = conn_prepare_command(tconn, sock);
1128         if (!p)
1129                 return -EIO;
1130         p->mask = cpu_to_be32(mask.i);
1131         p->val = cpu_to_be32(val.i);
1132         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1133 }
1134
1135 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1136 {
1137         struct drbd_socket *sock;
1138         struct p_req_state_reply *p;
1139
1140         sock = &mdev->tconn->meta;
1141         p = drbd_prepare_command(mdev, sock);
1142         if (p) {
1143                 p->retcode = cpu_to_be32(retcode);
1144                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1145         }
1146 }
1147
1148 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1149 {
1150         struct drbd_socket *sock;
1151         struct p_req_state_reply *p;
1152         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1153
1154         sock = &tconn->meta;
1155         p = conn_prepare_command(tconn, sock);
1156         if (p) {
1157                 p->retcode = cpu_to_be32(retcode);
1158                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1159         }
1160 }
1161
1162 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1163 {
1164         BUG_ON(code & ~0xf);
1165         p->encoding = (p->encoding & ~0xf) | code;
1166 }
1167
1168 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1169 {
1170         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1171 }
1172
1173 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1174 {
1175         BUG_ON(n & ~0x7);
1176         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1177 }
1178
1179 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1180                          struct p_compressed_bm *p,
1181                          unsigned int size,
1182                          struct bm_xfer_ctx *c)
1183 {
1184         struct bitstream bs;
1185         unsigned long plain_bits;
1186         unsigned long tmp;
1187         unsigned long rl;
1188         unsigned len;
1189         unsigned toggle;
1190         int bits, use_rle;
1191
1192         /* may we use this feature? */
1193         rcu_read_lock();
1194         use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1195         rcu_read_unlock();
1196         if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1197                 return 0;
1198
1199         if (c->bit_offset >= c->bm_bits)
1200                 return 0; /* nothing to do. */
1201
1202         /* use at most thus many bytes */
1203         bitstream_init(&bs, p->code, size, 0);
1204         memset(p->code, 0, size);
1205         /* plain bits covered in this code string */
1206         plain_bits = 0;
1207
1208         /* p->encoding & 0x80 stores whether the first run length is set.
1209          * bit offset is implicit.
1210          * start with toggle == 2 to be able to tell the first iteration */
1211         toggle = 2;
1212
1213         /* see how much plain bits we can stuff into one packet
1214          * using RLE and VLI. */
1215         do {
1216                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1217                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1218                 if (tmp == -1UL)
1219                         tmp = c->bm_bits;
1220                 rl = tmp - c->bit_offset;
1221
1222                 if (toggle == 2) { /* first iteration */
1223                         if (rl == 0) {
1224                                 /* the first checked bit was set,
1225                                  * store start value, */
1226                                 dcbp_set_start(p, 1);
1227                                 /* but skip encoding of zero run length */
1228                                 toggle = !toggle;
1229                                 continue;
1230                         }
1231                         dcbp_set_start(p, 0);
1232                 }
1233
1234                 /* paranoia: catch zero runlength.
1235                  * can only happen if bitmap is modified while we scan it. */
1236                 if (rl == 0) {
1237                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1238                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1239                         return -1;
1240                 }
1241
1242                 bits = vli_encode_bits(&bs, rl);
1243                 if (bits == -ENOBUFS) /* buffer full */
1244                         break;
1245                 if (bits <= 0) {
1246                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1247                         return 0;
1248                 }
1249
1250                 toggle = !toggle;
1251                 plain_bits += rl;
1252                 c->bit_offset = tmp;
1253         } while (c->bit_offset < c->bm_bits);
1254
1255         len = bs.cur.b - p->code + !!bs.cur.bit;
1256
1257         if (plain_bits < (len << 3)) {
1258                 /* incompressible with this method.
1259                  * we need to rewind both word and bit position. */
1260                 c->bit_offset -= plain_bits;
1261                 bm_xfer_ctx_bit_to_word_offset(c);
1262                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1263                 return 0;
1264         }
1265
1266         /* RLE + VLI was able to compress it just fine.
1267          * update c->word_offset. */
1268         bm_xfer_ctx_bit_to_word_offset(c);
1269
1270         /* store pad_bits */
1271         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1272
1273         return len;
1274 }
1275
1276 /**
1277  * send_bitmap_rle_or_plain
1278  *
1279  * Return 0 when done, 1 when another iteration is needed, and a negative error
1280  * code upon failure.
1281  */
1282 static int
1283 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1284 {
1285         struct drbd_socket *sock = &mdev->tconn->data;
1286         unsigned int header_size = drbd_header_size(mdev->tconn);
1287         struct p_compressed_bm *p = sock->sbuf + header_size;
1288         int len, err;
1289
1290         len = fill_bitmap_rle_bits(mdev, p,
1291                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1292         if (len < 0)
1293                 return -EIO;
1294
1295         if (len) {
1296                 dcbp_set_code(p, RLE_VLI_Bits);
1297                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1298                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1299                                      NULL, 0);
1300                 c->packets[0]++;
1301                 c->bytes[0] += header_size + sizeof(*p) + len;
1302
1303                 if (c->bit_offset >= c->bm_bits)
1304                         len = 0; /* DONE */
1305         } else {
1306                 /* was not compressible.
1307                  * send a buffer full of plain text bits instead. */
1308                 unsigned int data_size;
1309                 unsigned long num_words;
1310                 unsigned long *p = sock->sbuf + header_size;
1311
1312                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1313                 num_words = min_t(size_t, data_size / sizeof(*p),
1314                                   c->bm_words - c->word_offset);
1315                 len = num_words * sizeof(*p);
1316                 if (len)
1317                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1318                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1319                 c->word_offset += num_words;
1320                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1321
1322                 c->packets[1]++;
1323                 c->bytes[1] += header_size + len;
1324
1325                 if (c->bit_offset > c->bm_bits)
1326                         c->bit_offset = c->bm_bits;
1327         }
1328         if (!err) {
1329                 if (len == 0) {
1330                         INFO_bm_xfer_stats(mdev, "send", c);
1331                         return 0;
1332                 } else
1333                         return 1;
1334         }
1335         return -EIO;
1336 }
1337
1338 /* See the comment at receive_bitmap() */
1339 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1340 {
1341         struct bm_xfer_ctx c;
1342         int err;
1343
1344         if (!expect(mdev->bitmap))
1345                 return false;
1346
1347         if (get_ldev(mdev)) {
1348                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1349                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1350                         drbd_bm_set_all(mdev);
1351                         if (drbd_bm_write(mdev)) {
1352                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1353                                  * but otherwise process as per normal - need to tell other
1354                                  * side that a full resync is required! */
1355                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1356                         } else {
1357                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1358                                 drbd_md_sync(mdev);
1359                         }
1360                 }
1361                 put_ldev(mdev);
1362         }
1363
1364         c = (struct bm_xfer_ctx) {
1365                 .bm_bits = drbd_bm_bits(mdev),
1366                 .bm_words = drbd_bm_words(mdev),
1367         };
1368
1369         do {
1370                 err = send_bitmap_rle_or_plain(mdev, &c);
1371         } while (err > 0);
1372
1373         return err == 0;
1374 }
1375
1376 int drbd_send_bitmap(struct drbd_conf *mdev)
1377 {
1378         struct drbd_socket *sock = &mdev->tconn->data;
1379         int err = -1;
1380
1381         mutex_lock(&sock->mutex);
1382         if (sock->socket)
1383                 err = !_drbd_send_bitmap(mdev);
1384         mutex_unlock(&sock->mutex);
1385         return err;
1386 }
1387
1388 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1389 {
1390         struct drbd_socket *sock;
1391         struct p_barrier_ack *p;
1392
1393         if (mdev->state.conn < C_CONNECTED)
1394                 return;
1395
1396         sock = &mdev->tconn->meta;
1397         p = drbd_prepare_command(mdev, sock);
1398         if (!p)
1399                 return;
1400         p->barrier = barrier_nr;
1401         p->set_size = cpu_to_be32(set_size);
1402         drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1403 }
1404
1405 /**
1406  * _drbd_send_ack() - Sends an ack packet
1407  * @mdev:       DRBD device.
1408  * @cmd:        Packet command code.
1409  * @sector:     sector, needs to be in big endian byte order
1410  * @blksize:    size in byte, needs to be in big endian byte order
1411  * @block_id:   Id, big endian byte order
1412  */
1413 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1414                           u64 sector, u32 blksize, u64 block_id)
1415 {
1416         struct drbd_socket *sock;
1417         struct p_block_ack *p;
1418
1419         if (mdev->state.conn < C_CONNECTED)
1420                 return -EIO;
1421
1422         sock = &mdev->tconn->meta;
1423         p = drbd_prepare_command(mdev, sock);
1424         if (!p)
1425                 return -EIO;
1426         p->sector = sector;
1427         p->block_id = block_id;
1428         p->blksize = blksize;
1429         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1430         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1431 }
1432
1433 /* dp->sector and dp->block_id already/still in network byte order,
1434  * data_size is payload size according to dp->head,
1435  * and may need to be corrected for digest size. */
1436 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1437                       struct p_data *dp, int data_size)
1438 {
1439         if (mdev->tconn->peer_integrity_tfm)
1440                 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1441         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1442                        dp->block_id);
1443 }
1444
1445 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1446                       struct p_block_req *rp)
1447 {
1448         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1449 }
1450
1451 /**
1452  * drbd_send_ack() - Sends an ack packet
1453  * @mdev:       DRBD device
1454  * @cmd:        packet command code
1455  * @peer_req:   peer request
1456  */
1457 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1458                   struct drbd_peer_request *peer_req)
1459 {
1460         return _drbd_send_ack(mdev, cmd,
1461                               cpu_to_be64(peer_req->i.sector),
1462                               cpu_to_be32(peer_req->i.size),
1463                               peer_req->block_id);
1464 }
1465
1466 /* This function misuses the block_id field to signal if the blocks
1467  * are is sync or not. */
1468 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1469                      sector_t sector, int blksize, u64 block_id)
1470 {
1471         return _drbd_send_ack(mdev, cmd,
1472                               cpu_to_be64(sector),
1473                               cpu_to_be32(blksize),
1474                               cpu_to_be64(block_id));
1475 }
1476
1477 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1478                        sector_t sector, int size, u64 block_id)
1479 {
1480         struct drbd_socket *sock;
1481         struct p_block_req *p;
1482
1483         sock = &mdev->tconn->data;
1484         p = drbd_prepare_command(mdev, sock);
1485         if (!p)
1486                 return -EIO;
1487         p->sector = cpu_to_be64(sector);
1488         p->block_id = block_id;
1489         p->blksize = cpu_to_be32(size);
1490         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1491 }
1492
1493 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1494                             void *digest, int digest_size, enum drbd_packet cmd)
1495 {
1496         struct drbd_socket *sock;
1497         struct p_block_req *p;
1498
1499         /* FIXME: Put the digest into the preallocated socket buffer.  */
1500
1501         sock = &mdev->tconn->data;
1502         p = drbd_prepare_command(mdev, sock);
1503         if (!p)
1504                 return -EIO;
1505         p->sector = cpu_to_be64(sector);
1506         p->block_id = ID_SYNCER /* unused */;
1507         p->blksize = cpu_to_be32(size);
1508         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1509                                  digest, digest_size);
1510 }
1511
1512 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1513 {
1514         struct drbd_socket *sock;
1515         struct p_block_req *p;
1516
1517         sock = &mdev->tconn->data;
1518         p = drbd_prepare_command(mdev, sock);
1519         if (!p)
1520                 return -EIO;
1521         p->sector = cpu_to_be64(sector);
1522         p->block_id = ID_SYNCER /* unused */;
1523         p->blksize = cpu_to_be32(size);
1524         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1525 }
1526
1527 /* called on sndtimeo
1528  * returns false if we should retry,
1529  * true if we think connection is dead
1530  */
1531 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1532 {
1533         int drop_it;
1534         /* long elapsed = (long)(jiffies - mdev->last_received); */
1535
1536         drop_it =   tconn->meta.socket == sock
1537                 || !tconn->asender.task
1538                 || get_t_state(&tconn->asender) != RUNNING
1539                 || tconn->cstate < C_WF_REPORT_PARAMS;
1540
1541         if (drop_it)
1542                 return true;
1543
1544         drop_it = !--tconn->ko_count;
1545         if (!drop_it) {
1546                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1547                          current->comm, current->pid, tconn->ko_count);
1548                 request_ping(tconn);
1549         }
1550
1551         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1552 }
1553
1554 static void drbd_update_congested(struct drbd_tconn *tconn)
1555 {
1556         struct sock *sk = tconn->data.socket->sk;
1557         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1558                 set_bit(NET_CONGESTED, &tconn->flags);
1559 }
1560
1561 /* The idea of sendpage seems to be to put some kind of reference
1562  * to the page into the skb, and to hand it over to the NIC. In
1563  * this process get_page() gets called.
1564  *
1565  * As soon as the page was really sent over the network put_page()
1566  * gets called by some part of the network layer. [ NIC driver? ]
1567  *
1568  * [ get_page() / put_page() increment/decrement the count. If count
1569  *   reaches 0 the page will be freed. ]
1570  *
1571  * This works nicely with pages from FSs.
1572  * But this means that in protocol A we might signal IO completion too early!
1573  *
1574  * In order not to corrupt data during a resync we must make sure
1575  * that we do not reuse our own buffer pages (EEs) to early, therefore
1576  * we have the net_ee list.
1577  *
1578  * XFS seems to have problems, still, it submits pages with page_count == 0!
1579  * As a workaround, we disable sendpage on pages
1580  * with page_count == 0 or PageSlab.
1581  */
1582 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1583                               int offset, size_t size, unsigned msg_flags)
1584 {
1585         struct socket *socket;
1586         void *addr;
1587         int err;
1588
1589         socket = mdev->tconn->data.socket;
1590         addr = kmap(page) + offset;
1591         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1592         kunmap(page);
1593         if (!err)
1594                 mdev->send_cnt += size >> 9;
1595         return err;
1596 }
1597
1598 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1599                     int offset, size_t size, unsigned msg_flags)
1600 {
1601         struct socket *socket = mdev->tconn->data.socket;
1602         mm_segment_t oldfs = get_fs();
1603         int len = size;
1604         int err = -EIO;
1605
1606         /* e.g. XFS meta- & log-data is in slab pages, which have a
1607          * page_count of 0 and/or have PageSlab() set.
1608          * we cannot use send_page for those, as that does get_page();
1609          * put_page(); and would cause either a VM_BUG directly, or
1610          * __page_cache_release a page that would actually still be referenced
1611          * by someone, leading to some obscure delayed Oops somewhere else. */
1612         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1613                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1614
1615         msg_flags |= MSG_NOSIGNAL;
1616         drbd_update_congested(mdev->tconn);
1617         set_fs(KERNEL_DS);
1618         do {
1619                 int sent;
1620
1621                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1622                 if (sent <= 0) {
1623                         if (sent == -EAGAIN) {
1624                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1625                                         break;
1626                                 continue;
1627                         }
1628                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1629                              __func__, (int)size, len, sent);
1630                         if (sent < 0)
1631                                 err = sent;
1632                         break;
1633                 }
1634                 len    -= sent;
1635                 offset += sent;
1636         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1637         set_fs(oldfs);
1638         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1639
1640         if (len == 0) {
1641                 err = 0;
1642                 mdev->send_cnt += size >> 9;
1643         }
1644         return err;
1645 }
1646
1647 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1648 {
1649         struct bio_vec *bvec;
1650         int i;
1651         /* hint all but last page with MSG_MORE */
1652         __bio_for_each_segment(bvec, bio, i, 0) {
1653                 int err;
1654
1655                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1656                                          bvec->bv_offset, bvec->bv_len,
1657                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1658                 if (err)
1659                         return err;
1660         }
1661         return 0;
1662 }
1663
1664 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1665 {
1666         struct bio_vec *bvec;
1667         int i;
1668         /* hint all but last page with MSG_MORE */
1669         __bio_for_each_segment(bvec, bio, i, 0) {
1670                 int err;
1671
1672                 err = _drbd_send_page(mdev, bvec->bv_page,
1673                                       bvec->bv_offset, bvec->bv_len,
1674                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1675                 if (err)
1676                         return err;
1677         }
1678         return 0;
1679 }
1680
1681 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1682                             struct drbd_peer_request *peer_req)
1683 {
1684         struct page *page = peer_req->pages;
1685         unsigned len = peer_req->i.size;
1686         int err;
1687
1688         /* hint all but last page with MSG_MORE */
1689         page_chain_for_each(page) {
1690                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1691
1692                 err = _drbd_send_page(mdev, page, 0, l,
1693                                       page_chain_next(page) ? MSG_MORE : 0);
1694                 if (err)
1695                         return err;
1696                 len -= l;
1697         }
1698         return 0;
1699 }
1700
1701 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1702 {
1703         if (mdev->tconn->agreed_pro_version >= 95)
1704                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1705                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1706                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1707                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1708         else
1709                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1710 }
1711
1712 /* Used to send write requests
1713  * R_PRIMARY -> Peer    (P_DATA)
1714  */
1715 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1716 {
1717         struct drbd_socket *sock;
1718         struct p_data *p;
1719         unsigned int dp_flags = 0;
1720         int dgs;
1721         int err;
1722
1723         sock = &mdev->tconn->data;
1724         p = drbd_prepare_command(mdev, sock);
1725         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1726                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1727
1728         if (!p)
1729                 return -EIO;
1730         p->sector = cpu_to_be64(req->i.sector);
1731         p->block_id = (unsigned long)req;
1732         p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1733         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1734         if (mdev->state.conn >= C_SYNC_SOURCE &&
1735             mdev->state.conn <= C_PAUSED_SYNC_T)
1736                 dp_flags |= DP_MAY_SET_IN_SYNC;
1737         if (mdev->tconn->agreed_pro_version >= 100) {
1738                 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1739                         dp_flags |= DP_SEND_RECEIVE_ACK;
1740                 if (req->rq_state & RQ_EXP_WRITE_ACK)
1741                         dp_flags |= DP_SEND_WRITE_ACK;
1742         }
1743         p->dp_flags = cpu_to_be32(dp_flags);
1744         if (dgs)
1745                 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1746         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1747         if (!err) {
1748                 /* For protocol A, we have to memcpy the payload into
1749                  * socket buffers, as we may complete right away
1750                  * as soon as we handed it over to tcp, at which point the data
1751                  * pages may become invalid.
1752                  *
1753                  * For data-integrity enabled, we copy it as well, so we can be
1754                  * sure that even if the bio pages may still be modified, it
1755                  * won't change the data on the wire, thus if the digest checks
1756                  * out ok after sending on this side, but does not fit on the
1757                  * receiving side, we sure have detected corruption elsewhere.
1758                  */
1759                 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1760                         err = _drbd_send_bio(mdev, req->master_bio);
1761                 else
1762                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1763
1764                 /* double check digest, sometimes buffers have been modified in flight. */
1765                 if (dgs > 0 && dgs <= 64) {
1766                         /* 64 byte, 512 bit, is the largest digest size
1767                          * currently supported in kernel crypto. */
1768                         unsigned char digest[64];
1769                         drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1770                         if (memcmp(p + 1, digest, dgs)) {
1771                                 dev_warn(DEV,
1772                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1773                                         (unsigned long long)req->i.sector, req->i.size);
1774                         }
1775                 } /* else if (dgs > 64) {
1776                      ... Be noisy about digest too large ...
1777                 } */
1778         }
1779         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1780
1781         return err;
1782 }
1783
1784 /* answer packet, used to send data back for read requests:
1785  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1786  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1787  */
1788 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1789                     struct drbd_peer_request *peer_req)
1790 {
1791         struct drbd_socket *sock;
1792         struct p_data *p;
1793         int err;
1794         int dgs;
1795
1796         sock = &mdev->tconn->data;
1797         p = drbd_prepare_command(mdev, sock);
1798
1799         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1800                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1801
1802         if (!p)
1803                 return -EIO;
1804         p->sector = cpu_to_be64(peer_req->i.sector);
1805         p->block_id = peer_req->block_id;
1806         p->seq_num = 0;  /* unused */
1807         if (dgs)
1808                 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1809         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1810         if (!err)
1811                 err = _drbd_send_zc_ee(mdev, peer_req);
1812         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1813
1814         return err;
1815 }
1816
1817 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1818 {
1819         struct drbd_socket *sock;
1820         struct p_block_desc *p;
1821
1822         sock = &mdev->tconn->data;
1823         p = drbd_prepare_command(mdev, sock);
1824         if (!p)
1825                 return -EIO;
1826         p->sector = cpu_to_be64(req->i.sector);
1827         p->blksize = cpu_to_be32(req->i.size);
1828         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1829 }
1830
1831 /*
1832   drbd_send distinguishes two cases:
1833
1834   Packets sent via the data socket "sock"
1835   and packets sent via the meta data socket "msock"
1836
1837                     sock                      msock
1838   -----------------+-------------------------+------------------------------
1839   timeout           conf.timeout / 2          conf.timeout / 2
1840   timeout action    send a ping via msock     Abort communication
1841                                               and close all sockets
1842 */
1843
1844 /*
1845  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1846  */
1847 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1848               void *buf, size_t size, unsigned msg_flags)
1849 {
1850         struct kvec iov;
1851         struct msghdr msg;
1852         int rv, sent = 0;
1853
1854         if (!sock)
1855                 return -EBADR;
1856
1857         /* THINK  if (signal_pending) return ... ? */
1858
1859         iov.iov_base = buf;
1860         iov.iov_len  = size;
1861
1862         msg.msg_name       = NULL;
1863         msg.msg_namelen    = 0;
1864         msg.msg_control    = NULL;
1865         msg.msg_controllen = 0;
1866         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1867
1868         if (sock == tconn->data.socket) {
1869                 rcu_read_lock();
1870                 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1871                 rcu_read_unlock();
1872                 drbd_update_congested(tconn);
1873         }
1874         do {
1875                 /* STRANGE
1876                  * tcp_sendmsg does _not_ use its size parameter at all ?
1877                  *
1878                  * -EAGAIN on timeout, -EINTR on signal.
1879                  */
1880 /* THINK
1881  * do we need to block DRBD_SIG if sock == &meta.socket ??
1882  * otherwise wake_asender() might interrupt some send_*Ack !
1883  */
1884                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1885                 if (rv == -EAGAIN) {
1886                         if (we_should_drop_the_connection(tconn, sock))
1887                                 break;
1888                         else
1889                                 continue;
1890                 }
1891                 if (rv == -EINTR) {
1892                         flush_signals(current);
1893                         rv = 0;
1894                 }
1895                 if (rv < 0)
1896                         break;
1897                 sent += rv;
1898                 iov.iov_base += rv;
1899                 iov.iov_len  -= rv;
1900         } while (sent < size);
1901
1902         if (sock == tconn->data.socket)
1903                 clear_bit(NET_CONGESTED, &tconn->flags);
1904
1905         if (rv <= 0) {
1906                 if (rv != -EAGAIN) {
1907                         conn_err(tconn, "%s_sendmsg returned %d\n",
1908                                  sock == tconn->meta.socket ? "msock" : "sock",
1909                                  rv);
1910                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1911                 } else
1912                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1913         }
1914
1915         return sent;
1916 }
1917
1918 /**
1919  * drbd_send_all  -  Send an entire buffer
1920  *
1921  * Returns 0 upon success and a negative error value otherwise.
1922  */
1923 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1924                   size_t size, unsigned msg_flags)
1925 {
1926         int err;
1927
1928         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1929         if (err < 0)
1930                 return err;
1931         if (err != size)
1932                 return -EIO;
1933         return 0;
1934 }
1935
1936 static int drbd_open(struct block_device *bdev, fmode_t mode)
1937 {
1938         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1939         unsigned long flags;
1940         int rv = 0;
1941
1942         mutex_lock(&drbd_main_mutex);
1943         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1944         /* to have a stable mdev->state.role
1945          * and no race with updating open_cnt */
1946
1947         if (mdev->state.role != R_PRIMARY) {
1948                 if (mode & FMODE_WRITE)
1949                         rv = -EROFS;
1950                 else if (!allow_oos)
1951                         rv = -EMEDIUMTYPE;
1952         }
1953
1954         if (!rv)
1955                 mdev->open_cnt++;
1956         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1957         mutex_unlock(&drbd_main_mutex);
1958
1959         return rv;
1960 }
1961
1962 static int drbd_release(struct gendisk *gd, fmode_t mode)
1963 {
1964         struct drbd_conf *mdev = gd->private_data;
1965         mutex_lock(&drbd_main_mutex);
1966         mdev->open_cnt--;
1967         mutex_unlock(&drbd_main_mutex);
1968         return 0;
1969 }
1970
1971 static void drbd_set_defaults(struct drbd_conf *mdev)
1972 {
1973         /* Beware! The actual layout differs
1974          * between big endian and little endian */
1975         mdev->state = (union drbd_dev_state) {
1976                 { .role = R_SECONDARY,
1977                   .peer = R_UNKNOWN,
1978                   .conn = C_STANDALONE,
1979                   .disk = D_DISKLESS,
1980                   .pdsk = D_UNKNOWN,
1981                 } };
1982 }
1983
1984 void drbd_init_set_defaults(struct drbd_conf *mdev)
1985 {
1986         /* the memset(,0,) did most of this.
1987          * note: only assignments, no allocation in here */
1988
1989         drbd_set_defaults(mdev);
1990
1991         atomic_set(&mdev->ap_bio_cnt, 0);
1992         atomic_set(&mdev->ap_pending_cnt, 0);
1993         atomic_set(&mdev->rs_pending_cnt, 0);
1994         atomic_set(&mdev->unacked_cnt, 0);
1995         atomic_set(&mdev->local_cnt, 0);
1996         atomic_set(&mdev->pp_in_use_by_net, 0);
1997         atomic_set(&mdev->rs_sect_in, 0);
1998         atomic_set(&mdev->rs_sect_ev, 0);
1999         atomic_set(&mdev->ap_in_flight, 0);
2000
2001         mutex_init(&mdev->md_io_mutex);
2002         mutex_init(&mdev->own_state_mutex);
2003         mdev->state_mutex = &mdev->own_state_mutex;
2004
2005         spin_lock_init(&mdev->al_lock);
2006         spin_lock_init(&mdev->peer_seq_lock);
2007         spin_lock_init(&mdev->epoch_lock);
2008
2009         INIT_LIST_HEAD(&mdev->active_ee);
2010         INIT_LIST_HEAD(&mdev->sync_ee);
2011         INIT_LIST_HEAD(&mdev->done_ee);
2012         INIT_LIST_HEAD(&mdev->read_ee);
2013         INIT_LIST_HEAD(&mdev->net_ee);
2014         INIT_LIST_HEAD(&mdev->resync_reads);
2015         INIT_LIST_HEAD(&mdev->resync_work.list);
2016         INIT_LIST_HEAD(&mdev->unplug_work.list);
2017         INIT_LIST_HEAD(&mdev->go_diskless.list);
2018         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2019         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2020         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2021
2022         mdev->resync_work.cb  = w_resync_timer;
2023         mdev->unplug_work.cb  = w_send_write_hint;
2024         mdev->go_diskless.cb  = w_go_diskless;
2025         mdev->md_sync_work.cb = w_md_sync;
2026         mdev->bm_io_work.w.cb = w_bitmap_io;
2027         mdev->start_resync_work.cb = w_start_resync;
2028
2029         mdev->resync_work.mdev  = mdev;
2030         mdev->unplug_work.mdev  = mdev;
2031         mdev->go_diskless.mdev  = mdev;
2032         mdev->md_sync_work.mdev = mdev;
2033         mdev->bm_io_work.w.mdev = mdev;
2034         mdev->start_resync_work.mdev = mdev;
2035
2036         init_timer(&mdev->resync_timer);
2037         init_timer(&mdev->md_sync_timer);
2038         init_timer(&mdev->start_resync_timer);
2039         init_timer(&mdev->request_timer);
2040         mdev->resync_timer.function = resync_timer_fn;
2041         mdev->resync_timer.data = (unsigned long) mdev;
2042         mdev->md_sync_timer.function = md_sync_timer_fn;
2043         mdev->md_sync_timer.data = (unsigned long) mdev;
2044         mdev->start_resync_timer.function = start_resync_timer_fn;
2045         mdev->start_resync_timer.data = (unsigned long) mdev;
2046         mdev->request_timer.function = request_timer_fn;
2047         mdev->request_timer.data = (unsigned long) mdev;
2048
2049         init_waitqueue_head(&mdev->misc_wait);
2050         init_waitqueue_head(&mdev->state_wait);
2051         init_waitqueue_head(&mdev->ee_wait);
2052         init_waitqueue_head(&mdev->al_wait);
2053         init_waitqueue_head(&mdev->seq_wait);
2054
2055         mdev->write_ordering = WO_bdev_flush;
2056         mdev->resync_wenr = LC_FREE;
2057         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2058         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2059 }
2060
2061 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2062 {
2063         int i;
2064         if (mdev->tconn->receiver.t_state != NONE)
2065                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2066                                 mdev->tconn->receiver.t_state);
2067
2068         /* no need to lock it, I'm the only thread alive */
2069         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2070                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2071         mdev->al_writ_cnt  =
2072         mdev->bm_writ_cnt  =
2073         mdev->read_cnt     =
2074         mdev->recv_cnt     =
2075         mdev->send_cnt     =
2076         mdev->writ_cnt     =
2077         mdev->p_size       =
2078         mdev->rs_start     =
2079         mdev->rs_total     =
2080         mdev->rs_failed    = 0;
2081         mdev->rs_last_events = 0;
2082         mdev->rs_last_sect_ev = 0;
2083         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2084                 mdev->rs_mark_left[i] = 0;
2085                 mdev->rs_mark_time[i] = 0;
2086         }
2087         D_ASSERT(mdev->tconn->net_conf == NULL);
2088
2089         drbd_set_my_capacity(mdev, 0);
2090         if (mdev->bitmap) {
2091                 /* maybe never allocated. */
2092                 drbd_bm_resize(mdev, 0, 1);
2093                 drbd_bm_cleanup(mdev);
2094         }
2095
2096         drbd_free_bc(mdev->ldev);
2097         mdev->ldev = NULL;
2098
2099         clear_bit(AL_SUSPENDED, &mdev->flags);
2100
2101         D_ASSERT(list_empty(&mdev->active_ee));
2102         D_ASSERT(list_empty(&mdev->sync_ee));
2103         D_ASSERT(list_empty(&mdev->done_ee));
2104         D_ASSERT(list_empty(&mdev->read_ee));
2105         D_ASSERT(list_empty(&mdev->net_ee));
2106         D_ASSERT(list_empty(&mdev->resync_reads));
2107         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2108         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2109         D_ASSERT(list_empty(&mdev->resync_work.list));
2110         D_ASSERT(list_empty(&mdev->unplug_work.list));
2111         D_ASSERT(list_empty(&mdev->go_diskless.list));
2112
2113         drbd_set_defaults(mdev);
2114 }
2115
2116
2117 static void drbd_destroy_mempools(void)
2118 {
2119         struct page *page;
2120
2121         while (drbd_pp_pool) {
2122                 page = drbd_pp_pool;
2123                 drbd_pp_pool = (struct page *)page_private(page);
2124                 __free_page(page);
2125                 drbd_pp_vacant--;
2126         }
2127
2128         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2129
2130         if (drbd_md_io_bio_set)
2131                 bioset_free(drbd_md_io_bio_set);
2132         if (drbd_md_io_page_pool)
2133                 mempool_destroy(drbd_md_io_page_pool);
2134         if (drbd_ee_mempool)
2135                 mempool_destroy(drbd_ee_mempool);
2136         if (drbd_request_mempool)
2137                 mempool_destroy(drbd_request_mempool);
2138         if (drbd_ee_cache)
2139                 kmem_cache_destroy(drbd_ee_cache);
2140         if (drbd_request_cache)
2141                 kmem_cache_destroy(drbd_request_cache);
2142         if (drbd_bm_ext_cache)
2143                 kmem_cache_destroy(drbd_bm_ext_cache);
2144         if (drbd_al_ext_cache)
2145                 kmem_cache_destroy(drbd_al_ext_cache);
2146
2147         drbd_md_io_bio_set   = NULL;
2148         drbd_md_io_page_pool = NULL;
2149         drbd_ee_mempool      = NULL;
2150         drbd_request_mempool = NULL;
2151         drbd_ee_cache        = NULL;
2152         drbd_request_cache   = NULL;
2153         drbd_bm_ext_cache    = NULL;
2154         drbd_al_ext_cache    = NULL;
2155
2156         return;
2157 }
2158
2159 static int drbd_create_mempools(void)
2160 {
2161         struct page *page;
2162         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2163         int i;
2164
2165         /* prepare our caches and mempools */
2166         drbd_request_mempool = NULL;
2167         drbd_ee_cache        = NULL;
2168         drbd_request_cache   = NULL;
2169         drbd_bm_ext_cache    = NULL;
2170         drbd_al_ext_cache    = NULL;
2171         drbd_pp_pool         = NULL;
2172         drbd_md_io_page_pool = NULL;
2173         drbd_md_io_bio_set   = NULL;
2174
2175         /* caches */
2176         drbd_request_cache = kmem_cache_create(
2177                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2178         if (drbd_request_cache == NULL)
2179                 goto Enomem;
2180
2181         drbd_ee_cache = kmem_cache_create(
2182                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2183         if (drbd_ee_cache == NULL)
2184                 goto Enomem;
2185
2186         drbd_bm_ext_cache = kmem_cache_create(
2187                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2188         if (drbd_bm_ext_cache == NULL)
2189                 goto Enomem;
2190
2191         drbd_al_ext_cache = kmem_cache_create(
2192                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2193         if (drbd_al_ext_cache == NULL)
2194                 goto Enomem;
2195
2196         /* mempools */
2197         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2198         if (drbd_md_io_bio_set == NULL)
2199                 goto Enomem;
2200
2201         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2202         if (drbd_md_io_page_pool == NULL)
2203                 goto Enomem;
2204
2205         drbd_request_mempool = mempool_create(number,
2206                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2207         if (drbd_request_mempool == NULL)
2208                 goto Enomem;
2209
2210         drbd_ee_mempool = mempool_create(number,
2211                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2212         if (drbd_ee_mempool == NULL)
2213                 goto Enomem;
2214
2215         /* drbd's page pool */
2216         spin_lock_init(&drbd_pp_lock);
2217
2218         for (i = 0; i < number; i++) {
2219                 page = alloc_page(GFP_HIGHUSER);
2220                 if (!page)
2221                         goto Enomem;
2222                 set_page_private(page, (unsigned long)drbd_pp_pool);
2223                 drbd_pp_pool = page;
2224         }
2225         drbd_pp_vacant = number;
2226
2227         return 0;
2228
2229 Enomem:
2230         drbd_destroy_mempools(); /* in case we allocated some */
2231         return -ENOMEM;
2232 }
2233
2234 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2235         void *unused)
2236 {
2237         /* just so we have it.  you never know what interesting things we
2238          * might want to do here some day...
2239          */
2240
2241         return NOTIFY_DONE;
2242 }
2243
2244 static struct notifier_block drbd_notifier = {
2245         .notifier_call = drbd_notify_sys,
2246 };
2247
2248 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2249 {
2250         int rr;
2251
2252         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2253         if (rr)
2254                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2255
2256         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2257         if (rr)
2258                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2259
2260         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2261         if (rr)
2262                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2263
2264         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2265         if (rr)
2266                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2267
2268         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2269         if (rr)
2270                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2271 }
2272
2273 /* caution. no locking. */
2274 void drbd_minor_destroy(struct kref *kref)
2275 {
2276         struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2277         struct drbd_tconn *tconn = mdev->tconn;
2278
2279         /* paranoia asserts */
2280         D_ASSERT(mdev->open_cnt == 0);
2281         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2282         /* end paranoia asserts */
2283
2284         /* cleanup stuff that may have been allocated during
2285          * device (re-)configuration or state changes */
2286
2287         if (mdev->this_bdev)
2288                 bdput(mdev->this_bdev);
2289
2290         drbd_free_bc(mdev->ldev);
2291         mdev->ldev = NULL;
2292
2293         drbd_release_all_peer_reqs(mdev);
2294
2295         lc_destroy(mdev->act_log);
2296         lc_destroy(mdev->resync);
2297
2298         kfree(mdev->p_uuid);
2299         /* mdev->p_uuid = NULL; */
2300
2301         kfree(mdev->current_epoch);
2302         if (mdev->bitmap) /* should no longer be there. */
2303                 drbd_bm_cleanup(mdev);
2304         __free_page(mdev->md_io_page);
2305         put_disk(mdev->vdisk);
2306         blk_cleanup_queue(mdev->rq_queue);
2307         kfree(mdev->rs_plan_s);
2308         kfree(mdev);
2309
2310         kref_put(&tconn->kref, &conn_destroy);
2311 }
2312
2313 static void drbd_cleanup(void)
2314 {
2315         unsigned int i;
2316         struct drbd_conf *mdev;
2317         struct drbd_tconn *tconn, *tmp;
2318
2319         unregister_reboot_notifier(&drbd_notifier);
2320
2321         /* first remove proc,
2322          * drbdsetup uses it's presence to detect
2323          * whether DRBD is loaded.
2324          * If we would get stuck in proc removal,
2325          * but have netlink already deregistered,
2326          * some drbdsetup commands may wait forever
2327          * for an answer.
2328          */
2329         if (drbd_proc)
2330                 remove_proc_entry("drbd", NULL);
2331
2332         drbd_genl_unregister();
2333
2334         idr_for_each_entry(&minors, mdev, i) {
2335                 idr_remove(&minors, mdev_to_minor(mdev));
2336                 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2337                 del_gendisk(mdev->vdisk);
2338                 /* synchronize_rcu(); No other threads running at this point */
2339                 kref_put(&mdev->kref, &drbd_minor_destroy);
2340         }
2341
2342         /* not _rcu since, no other updater anymore. Genl already unregistered */
2343         list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2344                 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2345                 /* synchronize_rcu(); */
2346                 kref_put(&tconn->kref, &conn_destroy);
2347         }
2348
2349         drbd_destroy_mempools();
2350         unregister_blkdev(DRBD_MAJOR, "drbd");
2351
2352         idr_destroy(&minors);
2353
2354         printk(KERN_INFO "drbd: module cleanup done.\n");
2355 }
2356
2357 /**
2358  * drbd_congested() - Callback for pdflush
2359  * @congested_data:     User data
2360  * @bdi_bits:           Bits pdflush is currently interested in
2361  *
2362  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2363  */
2364 static int drbd_congested(void *congested_data, int bdi_bits)
2365 {
2366         struct drbd_conf *mdev = congested_data;
2367         struct request_queue *q;
2368         char reason = '-';
2369         int r = 0;
2370
2371         if (!may_inc_ap_bio(mdev)) {
2372                 /* DRBD has frozen IO */
2373                 r = bdi_bits;
2374                 reason = 'd';
2375                 goto out;
2376         }
2377
2378         if (get_ldev(mdev)) {
2379                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2380                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2381                 put_ldev(mdev);
2382                 if (r)
2383                         reason = 'b';
2384         }
2385
2386         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2387                 r |= (1 << BDI_async_congested);
2388                 reason = reason == 'b' ? 'a' : 'n';
2389         }
2390
2391 out:
2392         mdev->congestion_reason = reason;
2393         return r;
2394 }
2395
2396 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2397 {
2398         sema_init(&wq->s, 0);
2399         spin_lock_init(&wq->q_lock);
2400         INIT_LIST_HEAD(&wq->q);
2401 }
2402
2403 struct drbd_tconn *conn_get_by_name(const char *name)
2404 {
2405         struct drbd_tconn *tconn;
2406
2407         if (!name || !name[0])
2408                 return NULL;
2409
2410         rcu_read_lock();
2411         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2412                 if (!strcmp(tconn->name, name)) {
2413                         kref_get(&tconn->kref);
2414                         goto found;
2415                 }
2416         }
2417         tconn = NULL;
2418 found:
2419         rcu_read_unlock();
2420         return tconn;
2421 }
2422
2423 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2424                                      void *peer_addr, int peer_addr_len)
2425 {
2426         struct drbd_tconn *tconn;
2427
2428         rcu_read_lock();
2429         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2430                 if (tconn->my_addr_len == my_addr_len &&
2431                     tconn->peer_addr_len == peer_addr_len &&
2432                     !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2433                     !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2434                         kref_get(&tconn->kref);
2435                         goto found;
2436                 }
2437         }
2438         tconn = NULL;
2439 found:
2440         rcu_read_unlock();
2441         return tconn;
2442 }
2443
2444 static int drbd_alloc_socket(struct drbd_socket *socket)
2445 {
2446         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2447         if (!socket->rbuf)
2448                 return -ENOMEM;
2449         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2450         if (!socket->sbuf)
2451                 return -ENOMEM;
2452         return 0;
2453 }
2454
2455 static void drbd_free_socket(struct drbd_socket *socket)
2456 {
2457         free_page((unsigned long) socket->sbuf);
2458         free_page((unsigned long) socket->rbuf);
2459 }
2460
2461 void conn_free_crypto(struct drbd_tconn *tconn)
2462 {
2463         drbd_free_sock(tconn);
2464
2465         crypto_free_hash(tconn->csums_tfm);
2466         crypto_free_hash(tconn->verify_tfm);
2467         crypto_free_hash(tconn->cram_hmac_tfm);
2468         crypto_free_hash(tconn->integrity_tfm);
2469         crypto_free_hash(tconn->peer_integrity_tfm);
2470         kfree(tconn->int_dig_in);
2471         kfree(tconn->int_dig_vv);
2472
2473         tconn->csums_tfm = NULL;
2474         tconn->verify_tfm = NULL;
2475         tconn->cram_hmac_tfm = NULL;
2476         tconn->integrity_tfm = NULL;
2477         tconn->peer_integrity_tfm = NULL;
2478         tconn->int_dig_in = NULL;
2479         tconn->int_dig_vv = NULL;
2480 }
2481
2482 /* caller must be under genl_lock() */
2483 struct drbd_tconn *conn_create(const char *name)
2484 {
2485         struct drbd_tconn *tconn;
2486
2487         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2488         if (!tconn)
2489                 return NULL;
2490
2491         tconn->name = kstrdup(name, GFP_KERNEL);
2492         if (!tconn->name)
2493                 goto fail;
2494
2495         if (drbd_alloc_socket(&tconn->data))
2496                 goto fail;
2497         if (drbd_alloc_socket(&tconn->meta))
2498                 goto fail;
2499
2500         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2501                 goto fail;
2502
2503         if (!tl_init(tconn))
2504                 goto fail;
2505
2506         tconn->cstate = C_STANDALONE;
2507         mutex_init(&tconn->cstate_mutex);
2508         spin_lock_init(&tconn->req_lock);
2509         mutex_init(&tconn->conf_update);
2510         init_waitqueue_head(&tconn->ping_wait);
2511         idr_init(&tconn->volumes);
2512
2513         drbd_init_workqueue(&tconn->data.work);
2514         mutex_init(&tconn->data.mutex);
2515
2516         drbd_init_workqueue(&tconn->meta.work);
2517         mutex_init(&tconn->meta.mutex);
2518
2519         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2520         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2521         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2522
2523         drbd_set_res_opts_defaults(&tconn->res_opts);
2524
2525         kref_init(&tconn->kref);
2526         list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2527
2528         return tconn;
2529
2530 fail:
2531         tl_cleanup(tconn);
2532         free_cpumask_var(tconn->cpu_mask);
2533         drbd_free_socket(&tconn->meta);
2534         drbd_free_socket(&tconn->data);
2535         kfree(tconn->name);
2536         kfree(tconn);
2537
2538         return NULL;
2539 }
2540
2541 void conn_destroy(struct kref *kref)
2542 {
2543         struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2544
2545         idr_destroy(&tconn->volumes);
2546
2547         free_cpumask_var(tconn->cpu_mask);
2548         drbd_free_socket(&tconn->meta);
2549         drbd_free_socket(&tconn->data);
2550         kfree(tconn->name);
2551         kfree(tconn->int_dig_in);
2552         kfree(tconn->int_dig_vv);
2553         kfree(tconn);
2554 }
2555
2556 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2557 {
2558         struct drbd_conf *mdev;
2559         struct gendisk *disk;
2560         struct request_queue *q;
2561         int vnr_got = vnr;
2562         int minor_got = minor;
2563         enum drbd_ret_code err = ERR_NOMEM;
2564
2565         mdev = minor_to_mdev(minor);
2566         if (mdev)
2567                 return ERR_MINOR_EXISTS;
2568
2569         /* GFP_KERNEL, we are outside of all write-out paths */
2570         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2571         if (!mdev)
2572                 return ERR_NOMEM;
2573
2574         kref_get(&tconn->kref);
2575         mdev->tconn = tconn;
2576
2577         mdev->minor = minor;
2578         mdev->vnr = vnr;
2579
2580         drbd_init_set_defaults(mdev);
2581
2582         q = blk_alloc_queue(GFP_KERNEL);
2583         if (!q)
2584                 goto out_no_q;
2585         mdev->rq_queue = q;
2586         q->queuedata   = mdev;
2587
2588         disk = alloc_disk(1);
2589         if (!disk)
2590                 goto out_no_disk;
2591         mdev->vdisk = disk;
2592
2593         set_disk_ro(disk, true);
2594
2595         disk->queue = q;
2596         disk->major = DRBD_MAJOR;
2597         disk->first_minor = minor;
2598         disk->fops = &drbd_ops;
2599         sprintf(disk->disk_name, "drbd%d", minor);
2600         disk->private_data = mdev;
2601
2602         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2603         /* we have no partitions. we contain only ourselves. */
2604         mdev->this_bdev->bd_contains = mdev->this_bdev;
2605
2606         q->backing_dev_info.congested_fn = drbd_congested;
2607         q->backing_dev_info.congested_data = mdev;
2608
2609         blk_queue_make_request(q, drbd_make_request);
2610         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2611            This triggers a max_bio_size message upon first attach or connect */
2612         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2613         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2614         blk_queue_merge_bvec(q, drbd_merge_bvec);
2615         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2616
2617         mdev->md_io_page = alloc_page(GFP_KERNEL);
2618         if (!mdev->md_io_page)
2619                 goto out_no_io_page;
2620
2621         if (drbd_bm_init(mdev))
2622                 goto out_no_bitmap;
2623         mdev->read_requests = RB_ROOT;
2624         mdev->write_requests = RB_ROOT;
2625
2626         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2627         if (!mdev->current_epoch)
2628                 goto out_no_epoch;
2629
2630         INIT_LIST_HEAD(&mdev->current_epoch->list);
2631         mdev->epochs = 1;
2632
2633         if (!idr_pre_get(&minors, GFP_KERNEL))
2634                 goto out_no_minor_idr;
2635         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2636                 goto out_no_minor_idr;
2637         if (minor_got != minor) {
2638                 err = ERR_MINOR_EXISTS;
2639                 drbd_msg_put_info("requested minor exists already");
2640                 goto out_idr_remove_minor;
2641         }
2642
2643         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2644                 goto out_idr_remove_minor;
2645         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2646                 goto out_idr_remove_minor;
2647         if (vnr_got != vnr) {
2648                 err = ERR_INVALID_REQUEST;
2649                 drbd_msg_put_info("requested volume exists already");
2650                 goto out_idr_remove_vol;
2651         }
2652         add_disk(disk);
2653         kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2654
2655         /* inherit the connection state */
2656         mdev->state.conn = tconn->cstate;
2657         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2658                 drbd_connected(mdev);
2659
2660         return NO_ERROR;
2661
2662 out_idr_remove_vol:
2663         idr_remove(&tconn->volumes, vnr_got);
2664 out_idr_remove_minor:
2665         idr_remove(&minors, minor_got);
2666         synchronize_rcu();
2667 out_no_minor_idr:
2668         kfree(mdev->current_epoch);
2669 out_no_epoch:
2670         drbd_bm_cleanup(mdev);
2671 out_no_bitmap:
2672         __free_page(mdev->md_io_page);
2673 out_no_io_page:
2674         put_disk(disk);
2675 out_no_disk:
2676         blk_cleanup_queue(q);
2677 out_no_q:
2678         kfree(mdev);
2679         kref_put(&tconn->kref, &conn_destroy);
2680         return err;
2681 }
2682
2683 int __init drbd_init(void)
2684 {
2685         int err;
2686
2687         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2688                 printk(KERN_ERR
2689                        "drbd: invalid minor_count (%d)\n", minor_count);
2690 #ifdef MODULE
2691                 return -EINVAL;
2692 #else
2693                 minor_count = DRBD_MINOR_COUNT_DEF;
2694 #endif
2695         }
2696
2697         err = register_blkdev(DRBD_MAJOR, "drbd");
2698         if (err) {
2699                 printk(KERN_ERR
2700                        "drbd: unable to register block device major %d\n",
2701                        DRBD_MAJOR);
2702                 return err;
2703         }
2704
2705         err = drbd_genl_register();
2706         if (err) {
2707                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2708                 goto fail;
2709         }
2710
2711
2712         register_reboot_notifier(&drbd_notifier);
2713
2714         /*
2715          * allocate all necessary structs
2716          */
2717         err = -ENOMEM;
2718
2719         init_waitqueue_head(&drbd_pp_wait);
2720
2721         drbd_proc = NULL; /* play safe for drbd_cleanup */
2722         idr_init(&minors);
2723
2724         err = drbd_create_mempools();
2725         if (err)
2726                 goto fail;
2727
2728         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2729         if (!drbd_proc) {
2730                 printk(KERN_ERR "drbd: unable to register proc file\n");
2731                 goto fail;
2732         }
2733
2734         rwlock_init(&global_state_lock);
2735         INIT_LIST_HEAD(&drbd_tconns);
2736
2737         printk(KERN_INFO "drbd: initialized. "
2738                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2739                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2740         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2741         printk(KERN_INFO "drbd: registered as block device major %d\n",
2742                 DRBD_MAJOR);
2743
2744         return 0; /* Success! */
2745
2746 fail:
2747         drbd_cleanup();
2748         if (err == -ENOMEM)
2749                 /* currently always the case */
2750                 printk(KERN_ERR "drbd: ran out of memory\n");
2751         else
2752                 printk(KERN_ERR "drbd: initialization failure\n");
2753         return err;
2754 }
2755
2756 void drbd_free_bc(struct drbd_backing_dev *ldev)
2757 {
2758         if (ldev == NULL)
2759                 return;
2760
2761         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2762         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2763
2764         kfree(ldev);
2765 }
2766
2767 void drbd_free_sock(struct drbd_tconn *tconn)
2768 {
2769         if (tconn->data.socket) {
2770                 mutex_lock(&tconn->data.mutex);
2771                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2772                 sock_release(tconn->data.socket);
2773                 tconn->data.socket = NULL;
2774                 mutex_unlock(&tconn->data.mutex);
2775         }
2776         if (tconn->meta.socket) {
2777                 mutex_lock(&tconn->meta.mutex);
2778                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2779                 sock_release(tconn->meta.socket);
2780                 tconn->meta.socket = NULL;
2781                 mutex_unlock(&tconn->meta.mutex);
2782         }
2783 }
2784
2785 /* meta data management */
2786
2787 struct meta_data_on_disk {
2788         u64 la_size;           /* last agreed size. */
2789         u64 uuid[UI_SIZE];   /* UUIDs. */
2790         u64 device_uuid;
2791         u64 reserved_u64_1;
2792         u32 flags;             /* MDF */
2793         u32 magic;
2794         u32 md_size_sect;
2795         u32 al_offset;         /* offset to this block */
2796         u32 al_nr_extents;     /* important for restoring the AL */
2797               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2798         u32 bm_offset;         /* offset to the bitmap, from here */
2799         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2800         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2801         u32 reserved_u32[3];
2802
2803 } __packed;
2804
2805 /**
2806  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2807  * @mdev:       DRBD device.
2808  */
2809 void drbd_md_sync(struct drbd_conf *mdev)
2810 {
2811         struct meta_data_on_disk *buffer;
2812         sector_t sector;
2813         int i;
2814
2815         del_timer(&mdev->md_sync_timer);
2816         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2817         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2818                 return;
2819
2820         /* We use here D_FAILED and not D_ATTACHING because we try to write
2821          * metadata even if we detach due to a disk failure! */
2822         if (!get_ldev_if_state(mdev, D_FAILED))
2823                 return;
2824
2825         mutex_lock(&mdev->md_io_mutex);
2826         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2827         memset(buffer, 0, 512);
2828
2829         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2830         for (i = UI_CURRENT; i < UI_SIZE; i++)
2831                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2832         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2833         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2834
2835         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2836         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2837         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2838         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2839         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2840
2841         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2842         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2843
2844         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2845         sector = mdev->ldev->md.md_offset;
2846
2847         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2848                 /* this was a try anyways ... */
2849                 dev_err(DEV, "meta data update failed!\n");
2850                 drbd_chk_io_error(mdev, 1, true);
2851         }
2852
2853         /* Update mdev->ldev->md.la_size_sect,
2854          * since we updated it on metadata. */
2855         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2856
2857         mutex_unlock(&mdev->md_io_mutex);
2858         put_ldev(mdev);
2859 }
2860
2861 /**
2862  * drbd_md_read() - Reads in the meta data super block
2863  * @mdev:       DRBD device.
2864  * @bdev:       Device from which the meta data should be read in.
2865  *
2866  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2867  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2868  */
2869 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2870 {
2871         struct meta_data_on_disk *buffer;
2872         int i, rv = NO_ERROR;
2873
2874         if (!get_ldev_if_state(mdev, D_ATTACHING))
2875                 return ERR_IO_MD_DISK;
2876
2877         mutex_lock(&mdev->md_io_mutex);
2878         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2879
2880         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2881                 /* NOTE: can't do normal error processing here as this is
2882                    called BEFORE disk is attached */
2883                 dev_err(DEV, "Error while reading metadata.\n");
2884                 rv = ERR_IO_MD_DISK;
2885                 goto err;
2886         }
2887
2888         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2889                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2890                 rv = ERR_MD_INVALID;
2891                 goto err;
2892         }
2893         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2894                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2895                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2896                 rv = ERR_MD_INVALID;
2897                 goto err;
2898         }
2899         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2900                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2901                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2902                 rv = ERR_MD_INVALID;
2903                 goto err;
2904         }
2905         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2906                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2907                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2908                 rv = ERR_MD_INVALID;
2909                 goto err;
2910         }
2911
2912         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2913                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2914                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2915                 rv = ERR_MD_INVALID;
2916                 goto err;
2917         }
2918
2919         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2920         for (i = UI_CURRENT; i < UI_SIZE; i++)
2921                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2922         bdev->md.flags = be32_to_cpu(buffer->flags);
2923         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2924
2925         spin_lock_irq(&mdev->tconn->req_lock);
2926         if (mdev->state.conn < C_CONNECTED) {
2927                 int peer;
2928                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2929                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2930                 mdev->peer_max_bio_size = peer;
2931         }
2932         spin_unlock_irq(&mdev->tconn->req_lock);
2933
2934         /* This blocks wants to be get removed... */
2935         bdev->disk_conf->al_extents = be32_to_cpu(buffer->al_nr_extents);
2936         if (bdev->disk_conf->al_extents < DRBD_AL_EXTENTS_MIN)
2937                 bdev->disk_conf->al_extents = DRBD_AL_EXTENTS_DEF;
2938
2939  err:
2940         mutex_unlock(&mdev->md_io_mutex);
2941         put_ldev(mdev);
2942
2943         return rv;
2944 }
2945
2946 /**
2947  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2948  * @mdev:       DRBD device.
2949  *
2950  * Call this function if you change anything that should be written to
2951  * the meta-data super block. This function sets MD_DIRTY, and starts a
2952  * timer that ensures that within five seconds you have to call drbd_md_sync().
2953  */
2954 #ifdef DEBUG
2955 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2956 {
2957         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2958                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2959                 mdev->last_md_mark_dirty.line = line;
2960                 mdev->last_md_mark_dirty.func = func;
2961         }
2962 }
2963 #else
2964 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2965 {
2966         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2967                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2968 }
2969 #endif
2970
2971 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2972 {
2973         int i;
2974
2975         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2976                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2977 }
2978
2979 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2980 {
2981         if (idx == UI_CURRENT) {
2982                 if (mdev->state.role == R_PRIMARY)
2983                         val |= 1;
2984                 else
2985                         val &= ~((u64)1);
2986
2987                 drbd_set_ed_uuid(mdev, val);
2988         }
2989
2990         mdev->ldev->md.uuid[idx] = val;
2991         drbd_md_mark_dirty(mdev);
2992 }
2993
2994
2995 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2996 {
2997         if (mdev->ldev->md.uuid[idx]) {
2998                 drbd_uuid_move_history(mdev);
2999                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3000         }
3001         _drbd_uuid_set(mdev, idx, val);
3002 }
3003
3004 /**
3005  * drbd_uuid_new_current() - Creates a new current UUID
3006  * @mdev:       DRBD device.
3007  *
3008  * Creates a new current UUID, and rotates the old current UUID into
3009  * the bitmap slot. Causes an incremental resync upon next connect.
3010  */
3011 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3012 {
3013         u64 val;
3014         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3015
3016         if (bm_uuid)
3017                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3018
3019         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3020
3021         get_random_bytes(&val, sizeof(u64));
3022         _drbd_uuid_set(mdev, UI_CURRENT, val);
3023         drbd_print_uuids(mdev, "new current UUID");
3024         /* get it to stable storage _now_ */
3025         drbd_md_sync(mdev);
3026 }
3027
3028 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3029 {
3030         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3031                 return;
3032
3033         if (val == 0) {
3034                 drbd_uuid_move_history(mdev);
3035                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3036                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3037         } else {
3038                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3039                 if (bm_uuid)
3040                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3041
3042                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3043         }
3044         drbd_md_mark_dirty(mdev);
3045 }
3046
3047 /**
3048  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3049  * @mdev:       DRBD device.
3050  *
3051  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3052  */
3053 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3054 {
3055         int rv = -EIO;
3056
3057         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3058                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3059                 drbd_md_sync(mdev);
3060                 drbd_bm_set_all(mdev);
3061
3062                 rv = drbd_bm_write(mdev);
3063
3064                 if (!rv) {
3065                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3066                         drbd_md_sync(mdev);
3067                 }
3068
3069                 put_ldev(mdev);
3070         }
3071
3072         return rv;
3073 }
3074
3075 /**
3076  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3077  * @mdev:       DRBD device.
3078  *
3079  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3080  */
3081 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3082 {
3083         int rv = -EIO;
3084
3085         drbd_resume_al(mdev);
3086         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3087                 drbd_bm_clear_all(mdev);
3088                 rv = drbd_bm_write(mdev);
3089                 put_ldev(mdev);
3090         }
3091
3092         return rv;
3093 }
3094
3095 static int w_bitmap_io(struct drbd_work *w, int unused)
3096 {
3097         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3098         struct drbd_conf *mdev = w->mdev;
3099         int rv = -EIO;
3100
3101         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3102
3103         if (get_ldev(mdev)) {
3104                 drbd_bm_lock(mdev, work->why, work->flags);
3105                 rv = work->io_fn(mdev);
3106                 drbd_bm_unlock(mdev);
3107                 put_ldev(mdev);
3108         }
3109
3110         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3111         wake_up(&mdev->misc_wait);
3112
3113         if (work->done)
3114                 work->done(mdev, rv);
3115
3116         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3117         work->why = NULL;
3118         work->flags = 0;
3119
3120         return 0;
3121 }
3122
3123 void drbd_ldev_destroy(struct drbd_conf *mdev)
3124 {
3125         lc_destroy(mdev->resync);
3126         mdev->resync = NULL;
3127         lc_destroy(mdev->act_log);
3128         mdev->act_log = NULL;
3129         __no_warn(local,
3130                 drbd_free_bc(mdev->ldev);
3131                 mdev->ldev = NULL;);
3132
3133         clear_bit(GO_DISKLESS, &mdev->flags);
3134 }
3135
3136 static int w_go_diskless(struct drbd_work *w, int unused)
3137 {
3138         struct drbd_conf *mdev = w->mdev;
3139
3140         D_ASSERT(mdev->state.disk == D_FAILED);
3141         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3142          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3143          * the protected members anymore, though, so once put_ldev reaches zero
3144          * again, it will be safe to free them. */
3145         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3146         return 0;
3147 }
3148
3149 void drbd_go_diskless(struct drbd_conf *mdev)
3150 {
3151         D_ASSERT(mdev->state.disk == D_FAILED);
3152         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3153                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3154 }
3155
3156 /**
3157  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3158  * @mdev:       DRBD device.
3159  * @io_fn:      IO callback to be called when bitmap IO is possible
3160  * @done:       callback to be called after the bitmap IO was performed
3161  * @why:        Descriptive text of the reason for doing the IO
3162  *
3163  * While IO on the bitmap happens we freeze application IO thus we ensure
3164  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3165  * called from worker context. It MUST NOT be used while a previous such
3166  * work is still pending!
3167  */
3168 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3169                           int (*io_fn)(struct drbd_conf *),
3170                           void (*done)(struct drbd_conf *, int),
3171                           char *why, enum bm_flag flags)
3172 {
3173         D_ASSERT(current == mdev->tconn->worker.task);
3174
3175         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3176         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3177         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3178         if (mdev->bm_io_work.why)
3179                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3180                         why, mdev->bm_io_work.why);
3181
3182         mdev->bm_io_work.io_fn = io_fn;
3183         mdev->bm_io_work.done = done;
3184         mdev->bm_io_work.why = why;
3185         mdev->bm_io_work.flags = flags;
3186
3187         spin_lock_irq(&mdev->tconn->req_lock);
3188         set_bit(BITMAP_IO, &mdev->flags);
3189         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3190                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3191                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3192         }
3193         spin_unlock_irq(&mdev->tconn->req_lock);
3194 }
3195
3196 /**
3197  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3198  * @mdev:       DRBD device.
3199  * @io_fn:      IO callback to be called when bitmap IO is possible
3200  * @why:        Descriptive text of the reason for doing the IO
3201  *
3202  * freezes application IO while that the actual IO operations runs. This
3203  * functions MAY NOT be called from worker context.
3204  */
3205 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3206                 char *why, enum bm_flag flags)
3207 {
3208         int rv;
3209
3210         D_ASSERT(current != mdev->tconn->worker.task);
3211
3212         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3213                 drbd_suspend_io(mdev);
3214
3215         drbd_bm_lock(mdev, why, flags);
3216         rv = io_fn(mdev);
3217         drbd_bm_unlock(mdev);
3218
3219         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3220                 drbd_resume_io(mdev);
3221
3222         return rv;
3223 }
3224
3225 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3226 {
3227         if ((mdev->ldev->md.flags & flag) != flag) {
3228                 drbd_md_mark_dirty(mdev);
3229                 mdev->ldev->md.flags |= flag;
3230         }
3231 }
3232
3233 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3234 {
3235         if ((mdev->ldev->md.flags & flag) != 0) {
3236                 drbd_md_mark_dirty(mdev);
3237                 mdev->ldev->md.flags &= ~flag;
3238         }
3239 }
3240 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3241 {
3242         return (bdev->md.flags & flag) != 0;
3243 }
3244
3245 static void md_sync_timer_fn(unsigned long data)
3246 {
3247         struct drbd_conf *mdev = (struct drbd_conf *) data;
3248
3249         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3250 }
3251
3252 static int w_md_sync(struct drbd_work *w, int unused)
3253 {
3254         struct drbd_conf *mdev = w->mdev;
3255
3256         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3257 #ifdef DEBUG
3258         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3259                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3260 #endif
3261         drbd_md_sync(mdev);
3262         return 0;
3263 }
3264
3265 const char *cmdname(enum drbd_packet cmd)
3266 {
3267         /* THINK may need to become several global tables
3268          * when we want to support more than
3269          * one PRO_VERSION */
3270         static const char *cmdnames[] = {
3271                 [P_DATA]                = "Data",
3272                 [P_DATA_REPLY]          = "DataReply",
3273                 [P_RS_DATA_REPLY]       = "RSDataReply",
3274                 [P_BARRIER]             = "Barrier",
3275                 [P_BITMAP]              = "ReportBitMap",
3276                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3277                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3278                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3279                 [P_DATA_REQUEST]        = "DataRequest",
3280                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3281                 [P_SYNC_PARAM]          = "SyncParam",
3282                 [P_SYNC_PARAM89]        = "SyncParam89",
3283                 [P_PROTOCOL]            = "ReportProtocol",
3284                 [P_UUIDS]               = "ReportUUIDs",
3285                 [P_SIZES]               = "ReportSizes",
3286                 [P_STATE]               = "ReportState",
3287                 [P_SYNC_UUID]           = "ReportSyncUUID",
3288                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3289                 [P_AUTH_RESPONSE]       = "AuthResponse",
3290                 [P_PING]                = "Ping",
3291                 [P_PING_ACK]            = "PingAck",
3292                 [P_RECV_ACK]            = "RecvAck",
3293                 [P_WRITE_ACK]           = "WriteAck",
3294                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3295                 [P_DISCARD_WRITE]        = "DiscardWrite",
3296                 [P_NEG_ACK]             = "NegAck",
3297                 [P_NEG_DREPLY]          = "NegDReply",
3298                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3299                 [P_BARRIER_ACK]         = "BarrierAck",
3300                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3301                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3302                 [P_OV_REQUEST]          = "OVRequest",
3303                 [P_OV_REPLY]            = "OVReply",
3304                 [P_OV_RESULT]           = "OVResult",
3305                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3306                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3307                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3308                 [P_DELAY_PROBE]         = "DelayProbe",
3309                 [P_OUT_OF_SYNC]         = "OutOfSync",
3310                 [P_RETRY_WRITE]         = "RetryWrite",
3311                 [P_RS_CANCEL]           = "RSCancel",
3312                 [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3313                 [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3314                 [P_RETRY_WRITE]         = "retry_write",
3315                 [P_PROTOCOL_UPDATE]     = "protocol_update",
3316
3317                 /* enum drbd_packet, but not commands - obsoleted flags:
3318                  *      P_MAY_IGNORE
3319                  *      P_MAX_OPT_CMD
3320                  */
3321         };
3322
3323         /* too big for the array: 0xfffX */
3324         if (cmd == P_INITIAL_META)
3325                 return "InitialMeta";
3326         if (cmd == P_INITIAL_DATA)
3327                 return "InitialData";
3328         if (cmd == P_CONNECTION_FEATURES)
3329                 return "ConnectionFeatures";
3330         if (cmd >= ARRAY_SIZE(cmdnames))
3331                 return "Unknown";
3332         return cmdnames[cmd];
3333 }
3334
3335 /**
3336  * drbd_wait_misc  -  wait for a request to make progress
3337  * @mdev:       device associated with the request
3338  * @i:          the struct drbd_interval embedded in struct drbd_request or
3339  *              struct drbd_peer_request
3340  */
3341 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3342 {
3343         struct net_conf *nc;
3344         DEFINE_WAIT(wait);
3345         long timeout;
3346
3347         rcu_read_lock();
3348         nc = rcu_dereference(mdev->tconn->net_conf);
3349         if (!nc) {
3350                 rcu_read_unlock();
3351                 return -ETIMEDOUT;
3352         }
3353         timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3354         rcu_read_unlock();
3355
3356         /* Indicate to wake up mdev->misc_wait on progress.  */
3357         i->waiting = true;
3358         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3359         spin_unlock_irq(&mdev->tconn->req_lock);
3360         timeout = schedule_timeout(timeout);
3361         finish_wait(&mdev->misc_wait, &wait);
3362         spin_lock_irq(&mdev->tconn->req_lock);
3363         if (!timeout || mdev->state.conn < C_CONNECTED)
3364                 return -ETIMEDOUT;
3365         if (signal_pending(current))
3366                 return -ERESTARTSYS;
3367         return 0;
3368 }
3369
3370 #ifdef CONFIG_DRBD_FAULT_INJECTION
3371 /* Fault insertion support including random number generator shamelessly
3372  * stolen from kernel/rcutorture.c */
3373 struct fault_random_state {
3374         unsigned long state;
3375         unsigned long count;
3376 };
3377
3378 #define FAULT_RANDOM_MULT 39916801  /* prime */
3379 #define FAULT_RANDOM_ADD        479001701 /* prime */
3380 #define FAULT_RANDOM_REFRESH 10000
3381
3382 /*
3383  * Crude but fast random-number generator.  Uses a linear congruential
3384  * generator, with occasional help from get_random_bytes().
3385  */
3386 static unsigned long
3387 _drbd_fault_random(struct fault_random_state *rsp)
3388 {
3389         long refresh;
3390
3391         if (!rsp->count--) {
3392                 get_random_bytes(&refresh, sizeof(refresh));
3393                 rsp->state += refresh;
3394                 rsp->count = FAULT_RANDOM_REFRESH;
3395         }
3396         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3397         return swahw32(rsp->state);
3398 }
3399
3400 static char *
3401 _drbd_fault_str(unsigned int type) {
3402         static char *_faults[] = {
3403                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3404                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3405                 [DRBD_FAULT_RS_WR] = "Resync write",
3406                 [DRBD_FAULT_RS_RD] = "Resync read",
3407                 [DRBD_FAULT_DT_WR] = "Data write",
3408                 [DRBD_FAULT_DT_RD] = "Data read",
3409                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3410                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3411                 [DRBD_FAULT_AL_EE] = "EE allocation",
3412                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3413         };
3414
3415         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3416 }
3417
3418 unsigned int
3419 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3420 {
3421         static struct fault_random_state rrs = {0, 0};
3422
3423         unsigned int ret = (
3424                 (fault_devs == 0 ||
3425                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3426                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3427
3428         if (ret) {
3429                 fault_count++;
3430
3431                 if (__ratelimit(&drbd_ratelimit_state))
3432                         dev_warn(DEV, "***Simulating %s failure\n",
3433                                 _drbd_fault_str(type));
3434         }
3435
3436         return ret;
3437 }
3438 #endif
3439
3440 const char *drbd_buildtag(void)
3441 {
3442         /* DRBD built from external sources has here a reference to the
3443            git hash of the source code. */
3444
3445         static char buildtag[38] = "\0uilt-in";
3446
3447         if (buildtag[0] == 0) {
3448 #ifdef CONFIG_MODULES
3449                 if (THIS_MODULE != NULL)
3450                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3451                 else
3452 #endif
3453                         buildtag[0] = 'b';
3454         }
3455
3456         return buildtag;
3457 }
3458
3459 module_init(drbd_init)
3460 module_exit(drbd_cleanup)
3461
3462 EXPORT_SYMBOL(drbd_conn_str);
3463 EXPORT_SYMBOL(drbd_role_str);
3464 EXPORT_SYMBOL(drbd_disk_str);
3465 EXPORT_SYMBOL(drbd_set_st_err_str);