drbd: detach from frozen backing device
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218         INIT_LIST_HEAD(&tconn->barrier_acked_requests);
219
220         return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225         if (tconn->oldest_tle != tconn->newest_tle)
226                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227         if (!list_empty(&tconn->out_of_sequence_requests))
228                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229         kfree(tconn->oldest_tle);
230         tconn->oldest_tle = NULL;
231         kfree(tconn->unused_spare_tle);
232         tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236  * _tl_add_barrier() - Adds a barrier to the transfer log
237  * @mdev:       DRBD device.
238  * @new:        Barrier to be added before the current head of the TL.
239  *
240  * The caller must hold the req_lock.
241  */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244         struct drbd_tl_epoch *newest_before;
245
246         INIT_LIST_HEAD(&new->requests);
247         INIT_LIST_HEAD(&new->w.list);
248         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249         new->next = NULL;
250         new->n_writes = 0;
251
252         newest_before = tconn->newest_tle;
253         /* never send a barrier number == 0, because that is special-cased
254          * when using TCQ for our write ordering code */
255         new->br_number = (newest_before->br_number+1) ?: 1;
256         if (tconn->newest_tle != new) {
257                 tconn->newest_tle->next = new;
258                 tconn->newest_tle = new;
259         }
260 }
261
262 /**
263  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264  * @mdev:       DRBD device.
265  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266  * @set_size:   Expected number of requests before that barrier.
267  *
268  * In case the passed barrier_nr or set_size does not match the oldest
269  * &struct drbd_tl_epoch objects this function will cause a termination
270  * of the connection.
271  */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273                 unsigned int set_size)
274 {
275         struct drbd_conf *mdev;
276         struct drbd_tl_epoch *b, *nob; /* next old barrier */
277         struct list_head *le, *tle;
278         struct drbd_request *r;
279
280         spin_lock_irq(&tconn->req_lock);
281
282         b = tconn->oldest_tle;
283
284         /* first some paranoia code */
285         if (b == NULL) {
286                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287                          barrier_nr);
288                 goto bail;
289         }
290         if (b->br_number != barrier_nr) {
291                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292                          barrier_nr, b->br_number);
293                 goto bail;
294         }
295         if (b->n_writes != set_size) {
296                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297                          barrier_nr, set_size, b->n_writes);
298                 goto bail;
299         }
300
301         /* Clean up list of requests processed during current epoch */
302         list_for_each_safe(le, tle, &b->requests) {
303                 r = list_entry(le, struct drbd_request, tl_requests);
304                 _req_mod(r, BARRIER_ACKED);
305         }
306         /* There could be requests on the list waiting for completion
307            of the write to the local disk. To avoid corruptions of
308            slab's data structures we have to remove the lists head.
309
310            Also there could have been a barrier ack out of sequence, overtaking
311            the write acks - which would be a bug and violating write ordering.
312            To not deadlock in case we lose connection while such requests are
313            still pending, we need some way to find them for the
314            _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316            These have been list_move'd to the out_of_sequence_requests list in
317            _req_mod(, BARRIER_ACKED) above.
318            */
319         list_splice_init(&b->requests, &tconn->barrier_acked_requests);
320         mdev = b->w.mdev;
321
322         nob = b->next;
323         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324                 _tl_add_barrier(tconn, b);
325                 if (nob)
326                         tconn->oldest_tle = nob;
327                 /* if nob == NULL b was the only barrier, and becomes the new
328                    barrier. Therefore tconn->oldest_tle points already to b */
329         } else {
330                 D_ASSERT(nob != NULL);
331                 tconn->oldest_tle = nob;
332                 kfree(b);
333         }
334
335         spin_unlock_irq(&tconn->req_lock);
336         dec_ap_pending(mdev);
337
338         return;
339
340 bail:
341         spin_unlock_irq(&tconn->req_lock);
342         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347  * _tl_restart() - Walks the transfer log, and applies an action to all requests
348  * @mdev:       DRBD device.
349  * @what:       The action/event to perform with all request objects
350  *
351  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352  * RESTART_FROZEN_DISK_IO.
353  */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356         struct drbd_tl_epoch *b, *tmp, **pn;
357         struct list_head *le, *tle, carry_reads;
358         struct drbd_request *req;
359         int rv, n_writes, n_reads;
360
361         b = tconn->oldest_tle;
362         pn = &tconn->oldest_tle;
363         while (b) {
364                 n_writes = 0;
365                 n_reads = 0;
366                 INIT_LIST_HEAD(&carry_reads);
367                 list_for_each_safe(le, tle, &b->requests) {
368                         req = list_entry(le, struct drbd_request, tl_requests);
369                         rv = _req_mod(req, what);
370
371                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
373                 }
374                 tmp = b->next;
375
376                 if (n_writes) {
377                         if (what == RESEND) {
378                                 b->n_writes = n_writes;
379                                 if (b->w.cb == NULL) {
380                                         b->w.cb = w_send_barrier;
381                                         inc_ap_pending(b->w.mdev);
382                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
383                                 }
384
385                                 drbd_queue_work(&tconn->data.work, &b->w);
386                         }
387                         pn = &b->next;
388                 } else {
389                         if (n_reads)
390                                 list_add(&carry_reads, &b->requests);
391                         /* there could still be requests on that ring list,
392                          * in case local io is still pending */
393                         list_del(&b->requests);
394
395                         /* dec_ap_pending corresponding to queue_barrier.
396                          * the newest barrier may not have been queued yet,
397                          * in which case w.cb is still NULL. */
398                         if (b->w.cb != NULL)
399                                 dec_ap_pending(b->w.mdev);
400
401                         if (b == tconn->newest_tle) {
402                                 /* recycle, but reinit! */
403                                 if (tmp != NULL)
404                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
405                                 INIT_LIST_HEAD(&b->requests);
406                                 list_splice(&carry_reads, &b->requests);
407                                 INIT_LIST_HEAD(&b->w.list);
408                                 b->w.cb = NULL;
409                                 b->br_number = net_random();
410                                 b->n_writes = 0;
411
412                                 *pn = b;
413                                 break;
414                         }
415                         *pn = tmp;
416                         kfree(b);
417                 }
418                 b = tmp;
419                 list_splice(&carry_reads, &b->requests);
420         }
421
422         /* Actions operating on the disk state, also want to work on
423            requests that got barrier acked. */
424         switch (what) {
425         case FAIL_FROZEN_DISK_IO:
426         case RESTART_FROZEN_DISK_IO:
427                 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
428                         req = list_entry(le, struct drbd_request, tl_requests);
429                         _req_mod(req, what);
430                 }
431         case CONNECTION_LOST_WHILE_PENDING:
432         case RESEND:
433                 break;
434         default:
435                 conn_err(tconn, "what = %d in _tl_restart()\n", what);
436         }
437 }
438
439 /**
440  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
441  * @mdev:       DRBD device.
442  *
443  * This is called after the connection to the peer was lost. The storage covered
444  * by the requests on the transfer gets marked as our of sync. Called from the
445  * receiver thread and the worker thread.
446  */
447 void tl_clear(struct drbd_tconn *tconn)
448 {
449         struct drbd_conf *mdev;
450         struct list_head *le, *tle;
451         struct drbd_request *r;
452         int vnr;
453
454         spin_lock_irq(&tconn->req_lock);
455
456         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
457
458         /* we expect this list to be empty. */
459         if (!list_empty(&tconn->out_of_sequence_requests))
460                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
461
462         /* but just in case, clean it up anyways! */
463         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
464                 r = list_entry(le, struct drbd_request, tl_requests);
465                 /* It would be nice to complete outside of spinlock.
466                  * But this is easier for now. */
467                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
468         }
469
470         /* ensure bit indicating barrier is required is clear */
471         rcu_read_lock();
472         idr_for_each_entry(&tconn->volumes, mdev, vnr)
473                 clear_bit(CREATE_BARRIER, &mdev->flags);
474         rcu_read_unlock();
475
476         spin_unlock_irq(&tconn->req_lock);
477 }
478
479 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
480 {
481         spin_lock_irq(&tconn->req_lock);
482         _tl_restart(tconn, what);
483         spin_unlock_irq(&tconn->req_lock);
484 }
485
486 /**
487  * tl_apply() - Applies an event to all requests for a certain mdev in the TL
488  * @mdev:       DRBD device.
489  * @what:       The action/event to perform with all request objects
490  *
491  * @what might ony be ABORT_DISK_IO.
492  */
493 void tl_apply(struct drbd_conf *mdev, enum drbd_req_event what)
494 {
495         struct drbd_tconn *tconn = mdev->tconn;
496         struct drbd_tl_epoch *b;
497         struct list_head *le, *tle;
498         struct drbd_request *req;
499
500         D_ASSERT(what == ABORT_DISK_IO);
501
502         spin_lock_irq(&tconn->req_lock);
503         b = tconn->oldest_tle;
504         while (b) {
505                 list_for_each_safe(le, tle, &b->requests) {
506                         req = list_entry(le, struct drbd_request, tl_requests);
507                         if (req->w.mdev == mdev)
508                                 _req_mod(req, what);
509                 }
510                 b = b->next;
511         }
512
513         list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
514                 req = list_entry(le, struct drbd_request, tl_requests);
515                 if (req->w.mdev == mdev)
516                         _req_mod(req, what);
517         }
518
519         spin_unlock_irq(&tconn->req_lock);
520 }
521
522 static int drbd_thread_setup(void *arg)
523 {
524         struct drbd_thread *thi = (struct drbd_thread *) arg;
525         struct drbd_tconn *tconn = thi->tconn;
526         unsigned long flags;
527         int retval;
528
529         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
530                  thi->name[0], thi->tconn->name);
531
532 restart:
533         retval = thi->function(thi);
534
535         spin_lock_irqsave(&thi->t_lock, flags);
536
537         /* if the receiver has been "EXITING", the last thing it did
538          * was set the conn state to "StandAlone",
539          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
540          * and receiver thread will be "started".
541          * drbd_thread_start needs to set "RESTARTING" in that case.
542          * t_state check and assignment needs to be within the same spinlock,
543          * so either thread_start sees EXITING, and can remap to RESTARTING,
544          * or thread_start see NONE, and can proceed as normal.
545          */
546
547         if (thi->t_state == RESTARTING) {
548                 conn_info(tconn, "Restarting %s thread\n", thi->name);
549                 thi->t_state = RUNNING;
550                 spin_unlock_irqrestore(&thi->t_lock, flags);
551                 goto restart;
552         }
553
554         thi->task = NULL;
555         thi->t_state = NONE;
556         smp_mb();
557         complete_all(&thi->stop);
558         spin_unlock_irqrestore(&thi->t_lock, flags);
559
560         conn_info(tconn, "Terminating %s\n", current->comm);
561
562         /* Release mod reference taken when thread was started */
563
564         kref_put(&tconn->kref, &conn_destroy);
565         module_put(THIS_MODULE);
566         return retval;
567 }
568
569 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
570                              int (*func) (struct drbd_thread *), char *name)
571 {
572         spin_lock_init(&thi->t_lock);
573         thi->task    = NULL;
574         thi->t_state = NONE;
575         thi->function = func;
576         thi->tconn = tconn;
577         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
578 }
579
580 int drbd_thread_start(struct drbd_thread *thi)
581 {
582         struct drbd_tconn *tconn = thi->tconn;
583         struct task_struct *nt;
584         unsigned long flags;
585
586         /* is used from state engine doing drbd_thread_stop_nowait,
587          * while holding the req lock irqsave */
588         spin_lock_irqsave(&thi->t_lock, flags);
589
590         switch (thi->t_state) {
591         case NONE:
592                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
593                          thi->name, current->comm, current->pid);
594
595                 /* Get ref on module for thread - this is released when thread exits */
596                 if (!try_module_get(THIS_MODULE)) {
597                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
598                         spin_unlock_irqrestore(&thi->t_lock, flags);
599                         return false;
600                 }
601
602                 kref_get(&thi->tconn->kref);
603
604                 init_completion(&thi->stop);
605                 thi->reset_cpu_mask = 1;
606                 thi->t_state = RUNNING;
607                 spin_unlock_irqrestore(&thi->t_lock, flags);
608                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
609
610                 nt = kthread_create(drbd_thread_setup, (void *) thi,
611                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
612
613                 if (IS_ERR(nt)) {
614                         conn_err(tconn, "Couldn't start thread\n");
615
616                         kref_put(&tconn->kref, &conn_destroy);
617                         module_put(THIS_MODULE);
618                         return false;
619                 }
620                 spin_lock_irqsave(&thi->t_lock, flags);
621                 thi->task = nt;
622                 thi->t_state = RUNNING;
623                 spin_unlock_irqrestore(&thi->t_lock, flags);
624                 wake_up_process(nt);
625                 break;
626         case EXITING:
627                 thi->t_state = RESTARTING;
628                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
629                                 thi->name, current->comm, current->pid);
630                 /* fall through */
631         case RUNNING:
632         case RESTARTING:
633         default:
634                 spin_unlock_irqrestore(&thi->t_lock, flags);
635                 break;
636         }
637
638         return true;
639 }
640
641
642 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
643 {
644         unsigned long flags;
645
646         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
647
648         /* may be called from state engine, holding the req lock irqsave */
649         spin_lock_irqsave(&thi->t_lock, flags);
650
651         if (thi->t_state == NONE) {
652                 spin_unlock_irqrestore(&thi->t_lock, flags);
653                 if (restart)
654                         drbd_thread_start(thi);
655                 return;
656         }
657
658         if (thi->t_state != ns) {
659                 if (thi->task == NULL) {
660                         spin_unlock_irqrestore(&thi->t_lock, flags);
661                         return;
662                 }
663
664                 thi->t_state = ns;
665                 smp_mb();
666                 init_completion(&thi->stop);
667                 if (thi->task != current)
668                         force_sig(DRBD_SIGKILL, thi->task);
669         }
670
671         spin_unlock_irqrestore(&thi->t_lock, flags);
672
673         if (wait)
674                 wait_for_completion(&thi->stop);
675 }
676
677 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
678 {
679         struct drbd_thread *thi =
680                 task == tconn->receiver.task ? &tconn->receiver :
681                 task == tconn->asender.task  ? &tconn->asender :
682                 task == tconn->worker.task   ? &tconn->worker : NULL;
683
684         return thi;
685 }
686
687 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
688 {
689         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
690         return thi ? thi->name : task->comm;
691 }
692
693 int conn_lowest_minor(struct drbd_tconn *tconn)
694 {
695         struct drbd_conf *mdev;
696         int vnr = 0, m;
697
698         rcu_read_lock();
699         mdev = idr_get_next(&tconn->volumes, &vnr);
700         m = mdev ? mdev_to_minor(mdev) : -1;
701         rcu_read_unlock();
702
703         return m;
704 }
705
706 #ifdef CONFIG_SMP
707 /**
708  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
709  * @mdev:       DRBD device.
710  *
711  * Forces all threads of a device onto the same CPU. This is beneficial for
712  * DRBD's performance. May be overwritten by user's configuration.
713  */
714 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
715 {
716         int ord, cpu;
717
718         /* user override. */
719         if (cpumask_weight(tconn->cpu_mask))
720                 return;
721
722         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
723         for_each_online_cpu(cpu) {
724                 if (ord-- == 0) {
725                         cpumask_set_cpu(cpu, tconn->cpu_mask);
726                         return;
727                 }
728         }
729         /* should not be reached */
730         cpumask_setall(tconn->cpu_mask);
731 }
732
733 /**
734  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
735  * @mdev:       DRBD device.
736  * @thi:        drbd_thread object
737  *
738  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
739  * prematurely.
740  */
741 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
742 {
743         struct task_struct *p = current;
744
745         if (!thi->reset_cpu_mask)
746                 return;
747         thi->reset_cpu_mask = 0;
748         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
749 }
750 #endif
751
752 /**
753  * drbd_header_size  -  size of a packet header
754  *
755  * The header size is a multiple of 8, so any payload following the header is
756  * word aligned on 64-bit architectures.  (The bitmap send and receive code
757  * relies on this.)
758  */
759 unsigned int drbd_header_size(struct drbd_tconn *tconn)
760 {
761         if (tconn->agreed_pro_version >= 100) {
762                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
763                 return sizeof(struct p_header100);
764         } else {
765                 BUILD_BUG_ON(sizeof(struct p_header80) !=
766                              sizeof(struct p_header95));
767                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
768                 return sizeof(struct p_header80);
769         }
770 }
771
772 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
773 {
774         h->magic   = cpu_to_be32(DRBD_MAGIC);
775         h->command = cpu_to_be16(cmd);
776         h->length  = cpu_to_be16(size);
777         return sizeof(struct p_header80);
778 }
779
780 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
781 {
782         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
783         h->command = cpu_to_be16(cmd);
784         h->length = cpu_to_be32(size);
785         return sizeof(struct p_header95);
786 }
787
788 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
789                                       int size, int vnr)
790 {
791         h->magic = cpu_to_be32(DRBD_MAGIC_100);
792         h->volume = cpu_to_be16(vnr);
793         h->command = cpu_to_be16(cmd);
794         h->length = cpu_to_be32(size);
795         h->pad = 0;
796         return sizeof(struct p_header100);
797 }
798
799 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
800                                    void *buffer, enum drbd_packet cmd, int size)
801 {
802         if (tconn->agreed_pro_version >= 100)
803                 return prepare_header100(buffer, cmd, size, vnr);
804         else if (tconn->agreed_pro_version >= 95 &&
805                  size > DRBD_MAX_SIZE_H80_PACKET)
806                 return prepare_header95(buffer, cmd, size);
807         else
808                 return prepare_header80(buffer, cmd, size);
809 }
810
811 static void *__conn_prepare_command(struct drbd_tconn *tconn,
812                                     struct drbd_socket *sock)
813 {
814         if (!sock->socket)
815                 return NULL;
816         return sock->sbuf + drbd_header_size(tconn);
817 }
818
819 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
820 {
821         void *p;
822
823         mutex_lock(&sock->mutex);
824         p = __conn_prepare_command(tconn, sock);
825         if (!p)
826                 mutex_unlock(&sock->mutex);
827
828         return p;
829 }
830
831 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
832 {
833         return conn_prepare_command(mdev->tconn, sock);
834 }
835
836 static int __send_command(struct drbd_tconn *tconn, int vnr,
837                           struct drbd_socket *sock, enum drbd_packet cmd,
838                           unsigned int header_size, void *data,
839                           unsigned int size)
840 {
841         int msg_flags;
842         int err;
843
844         /*
845          * Called with @data == NULL and the size of the data blocks in @size
846          * for commands that send data blocks.  For those commands, omit the
847          * MSG_MORE flag: this will increase the likelihood that data blocks
848          * which are page aligned on the sender will end up page aligned on the
849          * receiver.
850          */
851         msg_flags = data ? MSG_MORE : 0;
852
853         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
854                                       header_size + size);
855         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
856                             msg_flags);
857         if (data && !err)
858                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
859         return err;
860 }
861
862 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
863                                enum drbd_packet cmd, unsigned int header_size,
864                                void *data, unsigned int size)
865 {
866         return __send_command(tconn, 0, sock, cmd, header_size, data, size);
867 }
868
869 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
870                       enum drbd_packet cmd, unsigned int header_size,
871                       void *data, unsigned int size)
872 {
873         int err;
874
875         err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
876         mutex_unlock(&sock->mutex);
877         return err;
878 }
879
880 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
881                       enum drbd_packet cmd, unsigned int header_size,
882                       void *data, unsigned int size)
883 {
884         int err;
885
886         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
887                              data, size);
888         mutex_unlock(&sock->mutex);
889         return err;
890 }
891
892 int drbd_send_ping(struct drbd_tconn *tconn)
893 {
894         struct drbd_socket *sock;
895
896         sock = &tconn->meta;
897         if (!conn_prepare_command(tconn, sock))
898                 return -EIO;
899         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
900 }
901
902 int drbd_send_ping_ack(struct drbd_tconn *tconn)
903 {
904         struct drbd_socket *sock;
905
906         sock = &tconn->meta;
907         if (!conn_prepare_command(tconn, sock))
908                 return -EIO;
909         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
910 }
911
912 int drbd_send_sync_param(struct drbd_conf *mdev)
913 {
914         struct drbd_socket *sock;
915         struct p_rs_param_95 *p;
916         int size;
917         const int apv = mdev->tconn->agreed_pro_version;
918         enum drbd_packet cmd;
919         struct net_conf *nc;
920         struct disk_conf *dc;
921
922         sock = &mdev->tconn->data;
923         p = drbd_prepare_command(mdev, sock);
924         if (!p)
925                 return -EIO;
926
927         rcu_read_lock();
928         nc = rcu_dereference(mdev->tconn->net_conf);
929
930         size = apv <= 87 ? sizeof(struct p_rs_param)
931                 : apv == 88 ? sizeof(struct p_rs_param)
932                         + strlen(nc->verify_alg) + 1
933                 : apv <= 94 ? sizeof(struct p_rs_param_89)
934                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
935
936         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
937
938         /* initialize verify_alg and csums_alg */
939         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
940
941         if (get_ldev(mdev)) {
942                 dc = rcu_dereference(mdev->ldev->disk_conf);
943                 p->resync_rate = cpu_to_be32(dc->resync_rate);
944                 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
945                 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
946                 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
947                 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
948                 put_ldev(mdev);
949         } else {
950                 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
951                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
952                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
953                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
954                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
955         }
956
957         if (apv >= 88)
958                 strcpy(p->verify_alg, nc->verify_alg);
959         if (apv >= 89)
960                 strcpy(p->csums_alg, nc->csums_alg);
961         rcu_read_unlock();
962
963         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
964 }
965
966 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
967 {
968         struct drbd_socket *sock;
969         struct p_protocol *p;
970         struct net_conf *nc;
971         int size, cf;
972
973         sock = &tconn->data;
974         p = __conn_prepare_command(tconn, sock);
975         if (!p)
976                 return -EIO;
977
978         rcu_read_lock();
979         nc = rcu_dereference(tconn->net_conf);
980
981         if (nc->tentative && tconn->agreed_pro_version < 92) {
982                 rcu_read_unlock();
983                 mutex_unlock(&sock->mutex);
984                 conn_err(tconn, "--dry-run is not supported by peer");
985                 return -EOPNOTSUPP;
986         }
987
988         size = sizeof(*p);
989         if (tconn->agreed_pro_version >= 87)
990                 size += strlen(nc->integrity_alg) + 1;
991
992         p->protocol      = cpu_to_be32(nc->wire_protocol);
993         p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
994         p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
995         p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
996         p->two_primaries = cpu_to_be32(nc->two_primaries);
997         cf = 0;
998         if (nc->discard_my_data)
999                 cf |= CF_DISCARD_MY_DATA;
1000         if (nc->tentative)
1001                 cf |= CF_DRY_RUN;
1002         p->conn_flags    = cpu_to_be32(cf);
1003
1004         if (tconn->agreed_pro_version >= 87)
1005                 strcpy(p->integrity_alg, nc->integrity_alg);
1006         rcu_read_unlock();
1007
1008         return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
1009 }
1010
1011 int drbd_send_protocol(struct drbd_tconn *tconn)
1012 {
1013         int err;
1014
1015         mutex_lock(&tconn->data.mutex);
1016         err = __drbd_send_protocol(tconn, P_PROTOCOL);
1017         mutex_unlock(&tconn->data.mutex);
1018
1019         return err;
1020 }
1021
1022 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1023 {
1024         struct drbd_socket *sock;
1025         struct p_uuids *p;
1026         int i;
1027
1028         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1029                 return 0;
1030
1031         sock = &mdev->tconn->data;
1032         p = drbd_prepare_command(mdev, sock);
1033         if (!p) {
1034                 put_ldev(mdev);
1035                 return -EIO;
1036         }
1037         for (i = UI_CURRENT; i < UI_SIZE; i++)
1038                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1039
1040         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1041         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1042         rcu_read_lock();
1043         uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
1044         rcu_read_unlock();
1045         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1046         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1047         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1048
1049         put_ldev(mdev);
1050         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
1051 }
1052
1053 int drbd_send_uuids(struct drbd_conf *mdev)
1054 {
1055         return _drbd_send_uuids(mdev, 0);
1056 }
1057
1058 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1059 {
1060         return _drbd_send_uuids(mdev, 8);
1061 }
1062
1063 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1064 {
1065         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1066                 u64 *uuid = mdev->ldev->md.uuid;
1067                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1068                      text,
1069                      (unsigned long long)uuid[UI_CURRENT],
1070                      (unsigned long long)uuid[UI_BITMAP],
1071                      (unsigned long long)uuid[UI_HISTORY_START],
1072                      (unsigned long long)uuid[UI_HISTORY_END]);
1073                 put_ldev(mdev);
1074         } else {
1075                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1076                                 text,
1077                                 (unsigned long long)mdev->ed_uuid);
1078         }
1079 }
1080
1081 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1082 {
1083         struct drbd_socket *sock;
1084         struct p_rs_uuid *p;
1085         u64 uuid;
1086
1087         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1088
1089         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1090         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1091         drbd_print_uuids(mdev, "updated sync UUID");
1092         drbd_md_sync(mdev);
1093
1094         sock = &mdev->tconn->data;
1095         p = drbd_prepare_command(mdev, sock);
1096         if (p) {
1097                 p->uuid = cpu_to_be64(uuid);
1098                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1099         }
1100 }
1101
1102 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1103 {
1104         struct drbd_socket *sock;
1105         struct p_sizes *p;
1106         sector_t d_size, u_size;
1107         int q_order_type, max_bio_size;
1108
1109         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1110                 D_ASSERT(mdev->ldev->backing_bdev);
1111                 d_size = drbd_get_max_capacity(mdev->ldev);
1112                 rcu_read_lock();
1113                 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1114                 rcu_read_unlock();
1115                 q_order_type = drbd_queue_order_type(mdev);
1116                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1117                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1118                 put_ldev(mdev);
1119         } else {
1120                 d_size = 0;
1121                 u_size = 0;
1122                 q_order_type = QUEUE_ORDERED_NONE;
1123                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1124         }
1125
1126         sock = &mdev->tconn->data;
1127         p = drbd_prepare_command(mdev, sock);
1128         if (!p)
1129                 return -EIO;
1130
1131         if (mdev->tconn->agreed_pro_version <= 94)
1132                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
1133         else if (mdev->tconn->agreed_pro_version < 100)
1134                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
1135
1136         p->d_size = cpu_to_be64(d_size);
1137         p->u_size = cpu_to_be64(u_size);
1138         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1139         p->max_bio_size = cpu_to_be32(max_bio_size);
1140         p->queue_order_type = cpu_to_be16(q_order_type);
1141         p->dds_flags = cpu_to_be16(flags);
1142         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1143 }
1144
1145 /**
1146  * drbd_send_state() - Sends the drbd state to the peer
1147  * @mdev:       DRBD device.
1148  */
1149 int drbd_send_state(struct drbd_conf *mdev)
1150 {
1151         struct drbd_socket *sock;
1152         struct p_state *p;
1153
1154         sock = &mdev->tconn->data;
1155         p = drbd_prepare_command(mdev, sock);
1156         if (!p)
1157                 return -EIO;
1158         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1159         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1160 }
1161
1162 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1163 {
1164         struct drbd_socket *sock;
1165         struct p_req_state *p;
1166
1167         sock = &mdev->tconn->data;
1168         p = drbd_prepare_command(mdev, sock);
1169         if (!p)
1170                 return -EIO;
1171         p->mask = cpu_to_be32(mask.i);
1172         p->val = cpu_to_be32(val.i);
1173         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1174
1175 }
1176
1177 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1178 {
1179         enum drbd_packet cmd;
1180         struct drbd_socket *sock;
1181         struct p_req_state *p;
1182
1183         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1184         sock = &tconn->data;
1185         p = conn_prepare_command(tconn, sock);
1186         if (!p)
1187                 return -EIO;
1188         p->mask = cpu_to_be32(mask.i);
1189         p->val = cpu_to_be32(val.i);
1190         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1191 }
1192
1193 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1194 {
1195         struct drbd_socket *sock;
1196         struct p_req_state_reply *p;
1197
1198         sock = &mdev->tconn->meta;
1199         p = drbd_prepare_command(mdev, sock);
1200         if (p) {
1201                 p->retcode = cpu_to_be32(retcode);
1202                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1203         }
1204 }
1205
1206 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1207 {
1208         struct drbd_socket *sock;
1209         struct p_req_state_reply *p;
1210         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1211
1212         sock = &tconn->meta;
1213         p = conn_prepare_command(tconn, sock);
1214         if (p) {
1215                 p->retcode = cpu_to_be32(retcode);
1216                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1217         }
1218 }
1219
1220 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1221 {
1222         BUG_ON(code & ~0xf);
1223         p->encoding = (p->encoding & ~0xf) | code;
1224 }
1225
1226 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1227 {
1228         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1229 }
1230
1231 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1232 {
1233         BUG_ON(n & ~0x7);
1234         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1235 }
1236
1237 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1238                          struct p_compressed_bm *p,
1239                          unsigned int size,
1240                          struct bm_xfer_ctx *c)
1241 {
1242         struct bitstream bs;
1243         unsigned long plain_bits;
1244         unsigned long tmp;
1245         unsigned long rl;
1246         unsigned len;
1247         unsigned toggle;
1248         int bits, use_rle;
1249
1250         /* may we use this feature? */
1251         rcu_read_lock();
1252         use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1253         rcu_read_unlock();
1254         if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1255                 return 0;
1256
1257         if (c->bit_offset >= c->bm_bits)
1258                 return 0; /* nothing to do. */
1259
1260         /* use at most thus many bytes */
1261         bitstream_init(&bs, p->code, size, 0);
1262         memset(p->code, 0, size);
1263         /* plain bits covered in this code string */
1264         plain_bits = 0;
1265
1266         /* p->encoding & 0x80 stores whether the first run length is set.
1267          * bit offset is implicit.
1268          * start with toggle == 2 to be able to tell the first iteration */
1269         toggle = 2;
1270
1271         /* see how much plain bits we can stuff into one packet
1272          * using RLE and VLI. */
1273         do {
1274                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1275                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1276                 if (tmp == -1UL)
1277                         tmp = c->bm_bits;
1278                 rl = tmp - c->bit_offset;
1279
1280                 if (toggle == 2) { /* first iteration */
1281                         if (rl == 0) {
1282                                 /* the first checked bit was set,
1283                                  * store start value, */
1284                                 dcbp_set_start(p, 1);
1285                                 /* but skip encoding of zero run length */
1286                                 toggle = !toggle;
1287                                 continue;
1288                         }
1289                         dcbp_set_start(p, 0);
1290                 }
1291
1292                 /* paranoia: catch zero runlength.
1293                  * can only happen if bitmap is modified while we scan it. */
1294                 if (rl == 0) {
1295                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1296                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1297                         return -1;
1298                 }
1299
1300                 bits = vli_encode_bits(&bs, rl);
1301                 if (bits == -ENOBUFS) /* buffer full */
1302                         break;
1303                 if (bits <= 0) {
1304                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1305                         return 0;
1306                 }
1307
1308                 toggle = !toggle;
1309                 plain_bits += rl;
1310                 c->bit_offset = tmp;
1311         } while (c->bit_offset < c->bm_bits);
1312
1313         len = bs.cur.b - p->code + !!bs.cur.bit;
1314
1315         if (plain_bits < (len << 3)) {
1316                 /* incompressible with this method.
1317                  * we need to rewind both word and bit position. */
1318                 c->bit_offset -= plain_bits;
1319                 bm_xfer_ctx_bit_to_word_offset(c);
1320                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1321                 return 0;
1322         }
1323
1324         /* RLE + VLI was able to compress it just fine.
1325          * update c->word_offset. */
1326         bm_xfer_ctx_bit_to_word_offset(c);
1327
1328         /* store pad_bits */
1329         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1330
1331         return len;
1332 }
1333
1334 /**
1335  * send_bitmap_rle_or_plain
1336  *
1337  * Return 0 when done, 1 when another iteration is needed, and a negative error
1338  * code upon failure.
1339  */
1340 static int
1341 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1342 {
1343         struct drbd_socket *sock = &mdev->tconn->data;
1344         unsigned int header_size = drbd_header_size(mdev->tconn);
1345         struct p_compressed_bm *p = sock->sbuf + header_size;
1346         int len, err;
1347
1348         len = fill_bitmap_rle_bits(mdev, p,
1349                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1350         if (len < 0)
1351                 return -EIO;
1352
1353         if (len) {
1354                 dcbp_set_code(p, RLE_VLI_Bits);
1355                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1356                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1357                                      NULL, 0);
1358                 c->packets[0]++;
1359                 c->bytes[0] += header_size + sizeof(*p) + len;
1360
1361                 if (c->bit_offset >= c->bm_bits)
1362                         len = 0; /* DONE */
1363         } else {
1364                 /* was not compressible.
1365                  * send a buffer full of plain text bits instead. */
1366                 unsigned int data_size;
1367                 unsigned long num_words;
1368                 unsigned long *p = sock->sbuf + header_size;
1369
1370                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1371                 num_words = min_t(size_t, data_size / sizeof(*p),
1372                                   c->bm_words - c->word_offset);
1373                 len = num_words * sizeof(*p);
1374                 if (len)
1375                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1376                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1377                 c->word_offset += num_words;
1378                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1379
1380                 c->packets[1]++;
1381                 c->bytes[1] += header_size + len;
1382
1383                 if (c->bit_offset > c->bm_bits)
1384                         c->bit_offset = c->bm_bits;
1385         }
1386         if (!err) {
1387                 if (len == 0) {
1388                         INFO_bm_xfer_stats(mdev, "send", c);
1389                         return 0;
1390                 } else
1391                         return 1;
1392         }
1393         return -EIO;
1394 }
1395
1396 /* See the comment at receive_bitmap() */
1397 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1398 {
1399         struct bm_xfer_ctx c;
1400         int err;
1401
1402         if (!expect(mdev->bitmap))
1403                 return false;
1404
1405         if (get_ldev(mdev)) {
1406                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1407                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1408                         drbd_bm_set_all(mdev);
1409                         if (drbd_bm_write(mdev)) {
1410                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1411                                  * but otherwise process as per normal - need to tell other
1412                                  * side that a full resync is required! */
1413                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1414                         } else {
1415                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1416                                 drbd_md_sync(mdev);
1417                         }
1418                 }
1419                 put_ldev(mdev);
1420         }
1421
1422         c = (struct bm_xfer_ctx) {
1423                 .bm_bits = drbd_bm_bits(mdev),
1424                 .bm_words = drbd_bm_words(mdev),
1425         };
1426
1427         do {
1428                 err = send_bitmap_rle_or_plain(mdev, &c);
1429         } while (err > 0);
1430
1431         return err == 0;
1432 }
1433
1434 int drbd_send_bitmap(struct drbd_conf *mdev)
1435 {
1436         struct drbd_socket *sock = &mdev->tconn->data;
1437         int err = -1;
1438
1439         mutex_lock(&sock->mutex);
1440         if (sock->socket)
1441                 err = !_drbd_send_bitmap(mdev);
1442         mutex_unlock(&sock->mutex);
1443         return err;
1444 }
1445
1446 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1447 {
1448         struct drbd_socket *sock;
1449         struct p_barrier_ack *p;
1450
1451         if (mdev->state.conn < C_CONNECTED)
1452                 return;
1453
1454         sock = &mdev->tconn->meta;
1455         p = drbd_prepare_command(mdev, sock);
1456         if (!p)
1457                 return;
1458         p->barrier = barrier_nr;
1459         p->set_size = cpu_to_be32(set_size);
1460         drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1461 }
1462
1463 /**
1464  * _drbd_send_ack() - Sends an ack packet
1465  * @mdev:       DRBD device.
1466  * @cmd:        Packet command code.
1467  * @sector:     sector, needs to be in big endian byte order
1468  * @blksize:    size in byte, needs to be in big endian byte order
1469  * @block_id:   Id, big endian byte order
1470  */
1471 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1472                           u64 sector, u32 blksize, u64 block_id)
1473 {
1474         struct drbd_socket *sock;
1475         struct p_block_ack *p;
1476
1477         if (mdev->state.conn < C_CONNECTED)
1478                 return -EIO;
1479
1480         sock = &mdev->tconn->meta;
1481         p = drbd_prepare_command(mdev, sock);
1482         if (!p)
1483                 return -EIO;
1484         p->sector = sector;
1485         p->block_id = block_id;
1486         p->blksize = blksize;
1487         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1488         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1489 }
1490
1491 /* dp->sector and dp->block_id already/still in network byte order,
1492  * data_size is payload size according to dp->head,
1493  * and may need to be corrected for digest size. */
1494 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1495                       struct p_data *dp, int data_size)
1496 {
1497         if (mdev->tconn->peer_integrity_tfm)
1498                 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1499         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1500                        dp->block_id);
1501 }
1502
1503 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1504                       struct p_block_req *rp)
1505 {
1506         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1507 }
1508
1509 /**
1510  * drbd_send_ack() - Sends an ack packet
1511  * @mdev:       DRBD device
1512  * @cmd:        packet command code
1513  * @peer_req:   peer request
1514  */
1515 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1516                   struct drbd_peer_request *peer_req)
1517 {
1518         return _drbd_send_ack(mdev, cmd,
1519                               cpu_to_be64(peer_req->i.sector),
1520                               cpu_to_be32(peer_req->i.size),
1521                               peer_req->block_id);
1522 }
1523
1524 /* This function misuses the block_id field to signal if the blocks
1525  * are is sync or not. */
1526 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1527                      sector_t sector, int blksize, u64 block_id)
1528 {
1529         return _drbd_send_ack(mdev, cmd,
1530                               cpu_to_be64(sector),
1531                               cpu_to_be32(blksize),
1532                               cpu_to_be64(block_id));
1533 }
1534
1535 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1536                        sector_t sector, int size, u64 block_id)
1537 {
1538         struct drbd_socket *sock;
1539         struct p_block_req *p;
1540
1541         sock = &mdev->tconn->data;
1542         p = drbd_prepare_command(mdev, sock);
1543         if (!p)
1544                 return -EIO;
1545         p->sector = cpu_to_be64(sector);
1546         p->block_id = block_id;
1547         p->blksize = cpu_to_be32(size);
1548         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1549 }
1550
1551 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1552                             void *digest, int digest_size, enum drbd_packet cmd)
1553 {
1554         struct drbd_socket *sock;
1555         struct p_block_req *p;
1556
1557         /* FIXME: Put the digest into the preallocated socket buffer.  */
1558
1559         sock = &mdev->tconn->data;
1560         p = drbd_prepare_command(mdev, sock);
1561         if (!p)
1562                 return -EIO;
1563         p->sector = cpu_to_be64(sector);
1564         p->block_id = ID_SYNCER /* unused */;
1565         p->blksize = cpu_to_be32(size);
1566         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1567                                  digest, digest_size);
1568 }
1569
1570 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1571 {
1572         struct drbd_socket *sock;
1573         struct p_block_req *p;
1574
1575         sock = &mdev->tconn->data;
1576         p = drbd_prepare_command(mdev, sock);
1577         if (!p)
1578                 return -EIO;
1579         p->sector = cpu_to_be64(sector);
1580         p->block_id = ID_SYNCER /* unused */;
1581         p->blksize = cpu_to_be32(size);
1582         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1583 }
1584
1585 /* called on sndtimeo
1586  * returns false if we should retry,
1587  * true if we think connection is dead
1588  */
1589 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1590 {
1591         int drop_it;
1592         /* long elapsed = (long)(jiffies - mdev->last_received); */
1593
1594         drop_it =   tconn->meta.socket == sock
1595                 || !tconn->asender.task
1596                 || get_t_state(&tconn->asender) != RUNNING
1597                 || tconn->cstate < C_WF_REPORT_PARAMS;
1598
1599         if (drop_it)
1600                 return true;
1601
1602         drop_it = !--tconn->ko_count;
1603         if (!drop_it) {
1604                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1605                          current->comm, current->pid, tconn->ko_count);
1606                 request_ping(tconn);
1607         }
1608
1609         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1610 }
1611
1612 static void drbd_update_congested(struct drbd_tconn *tconn)
1613 {
1614         struct sock *sk = tconn->data.socket->sk;
1615         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1616                 set_bit(NET_CONGESTED, &tconn->flags);
1617 }
1618
1619 /* The idea of sendpage seems to be to put some kind of reference
1620  * to the page into the skb, and to hand it over to the NIC. In
1621  * this process get_page() gets called.
1622  *
1623  * As soon as the page was really sent over the network put_page()
1624  * gets called by some part of the network layer. [ NIC driver? ]
1625  *
1626  * [ get_page() / put_page() increment/decrement the count. If count
1627  *   reaches 0 the page will be freed. ]
1628  *
1629  * This works nicely with pages from FSs.
1630  * But this means that in protocol A we might signal IO completion too early!
1631  *
1632  * In order not to corrupt data during a resync we must make sure
1633  * that we do not reuse our own buffer pages (EEs) to early, therefore
1634  * we have the net_ee list.
1635  *
1636  * XFS seems to have problems, still, it submits pages with page_count == 0!
1637  * As a workaround, we disable sendpage on pages
1638  * with page_count == 0 or PageSlab.
1639  */
1640 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1641                               int offset, size_t size, unsigned msg_flags)
1642 {
1643         struct socket *socket;
1644         void *addr;
1645         int err;
1646
1647         socket = mdev->tconn->data.socket;
1648         addr = kmap(page) + offset;
1649         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1650         kunmap(page);
1651         if (!err)
1652                 mdev->send_cnt += size >> 9;
1653         return err;
1654 }
1655
1656 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1657                     int offset, size_t size, unsigned msg_flags)
1658 {
1659         struct socket *socket = mdev->tconn->data.socket;
1660         mm_segment_t oldfs = get_fs();
1661         int len = size;
1662         int err = -EIO;
1663
1664         /* e.g. XFS meta- & log-data is in slab pages, which have a
1665          * page_count of 0 and/or have PageSlab() set.
1666          * we cannot use send_page for those, as that does get_page();
1667          * put_page(); and would cause either a VM_BUG directly, or
1668          * __page_cache_release a page that would actually still be referenced
1669          * by someone, leading to some obscure delayed Oops somewhere else. */
1670         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1671                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1672
1673         msg_flags |= MSG_NOSIGNAL;
1674         drbd_update_congested(mdev->tconn);
1675         set_fs(KERNEL_DS);
1676         do {
1677                 int sent;
1678
1679                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1680                 if (sent <= 0) {
1681                         if (sent == -EAGAIN) {
1682                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1683                                         break;
1684                                 continue;
1685                         }
1686                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1687                              __func__, (int)size, len, sent);
1688                         if (sent < 0)
1689                                 err = sent;
1690                         break;
1691                 }
1692                 len    -= sent;
1693                 offset += sent;
1694         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1695         set_fs(oldfs);
1696         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1697
1698         if (len == 0) {
1699                 err = 0;
1700                 mdev->send_cnt += size >> 9;
1701         }
1702         return err;
1703 }
1704
1705 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1706 {
1707         struct bio_vec *bvec;
1708         int i;
1709         /* hint all but last page with MSG_MORE */
1710         __bio_for_each_segment(bvec, bio, i, 0) {
1711                 int err;
1712
1713                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1714                                          bvec->bv_offset, bvec->bv_len,
1715                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1716                 if (err)
1717                         return err;
1718         }
1719         return 0;
1720 }
1721
1722 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1723 {
1724         struct bio_vec *bvec;
1725         int i;
1726         /* hint all but last page with MSG_MORE */
1727         __bio_for_each_segment(bvec, bio, i, 0) {
1728                 int err;
1729
1730                 err = _drbd_send_page(mdev, bvec->bv_page,
1731                                       bvec->bv_offset, bvec->bv_len,
1732                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1733                 if (err)
1734                         return err;
1735         }
1736         return 0;
1737 }
1738
1739 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1740                             struct drbd_peer_request *peer_req)
1741 {
1742         struct page *page = peer_req->pages;
1743         unsigned len = peer_req->i.size;
1744         int err;
1745
1746         /* hint all but last page with MSG_MORE */
1747         page_chain_for_each(page) {
1748                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1749
1750                 err = _drbd_send_page(mdev, page, 0, l,
1751                                       page_chain_next(page) ? MSG_MORE : 0);
1752                 if (err)
1753                         return err;
1754                 len -= l;
1755         }
1756         return 0;
1757 }
1758
1759 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1760 {
1761         if (mdev->tconn->agreed_pro_version >= 95)
1762                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1763                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1764                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1765                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1766         else
1767                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1768 }
1769
1770 /* Used to send write requests
1771  * R_PRIMARY -> Peer    (P_DATA)
1772  */
1773 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1774 {
1775         struct drbd_socket *sock;
1776         struct p_data *p;
1777         unsigned int dp_flags = 0;
1778         int dgs;
1779         int err;
1780
1781         sock = &mdev->tconn->data;
1782         p = drbd_prepare_command(mdev, sock);
1783         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1784                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1785
1786         if (!p)
1787                 return -EIO;
1788         p->sector = cpu_to_be64(req->i.sector);
1789         p->block_id = (unsigned long)req;
1790         p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1791         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1792         if (mdev->state.conn >= C_SYNC_SOURCE &&
1793             mdev->state.conn <= C_PAUSED_SYNC_T)
1794                 dp_flags |= DP_MAY_SET_IN_SYNC;
1795         if (mdev->tconn->agreed_pro_version >= 100) {
1796                 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1797                         dp_flags |= DP_SEND_RECEIVE_ACK;
1798                 if (req->rq_state & RQ_EXP_WRITE_ACK)
1799                         dp_flags |= DP_SEND_WRITE_ACK;
1800         }
1801         p->dp_flags = cpu_to_be32(dp_flags);
1802         if (dgs)
1803                 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1804         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1805         if (!err) {
1806                 /* For protocol A, we have to memcpy the payload into
1807                  * socket buffers, as we may complete right away
1808                  * as soon as we handed it over to tcp, at which point the data
1809                  * pages may become invalid.
1810                  *
1811                  * For data-integrity enabled, we copy it as well, so we can be
1812                  * sure that even if the bio pages may still be modified, it
1813                  * won't change the data on the wire, thus if the digest checks
1814                  * out ok after sending on this side, but does not fit on the
1815                  * receiving side, we sure have detected corruption elsewhere.
1816                  */
1817                 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1818                         err = _drbd_send_bio(mdev, req->master_bio);
1819                 else
1820                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1821
1822                 /* double check digest, sometimes buffers have been modified in flight. */
1823                 if (dgs > 0 && dgs <= 64) {
1824                         /* 64 byte, 512 bit, is the largest digest size
1825                          * currently supported in kernel crypto. */
1826                         unsigned char digest[64];
1827                         drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1828                         if (memcmp(p + 1, digest, dgs)) {
1829                                 dev_warn(DEV,
1830                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1831                                         (unsigned long long)req->i.sector, req->i.size);
1832                         }
1833                 } /* else if (dgs > 64) {
1834                      ... Be noisy about digest too large ...
1835                 } */
1836         }
1837         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1838
1839         return err;
1840 }
1841
1842 /* answer packet, used to send data back for read requests:
1843  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1844  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1845  */
1846 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1847                     struct drbd_peer_request *peer_req)
1848 {
1849         struct drbd_socket *sock;
1850         struct p_data *p;
1851         int err;
1852         int dgs;
1853
1854         sock = &mdev->tconn->data;
1855         p = drbd_prepare_command(mdev, sock);
1856
1857         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1858                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1859
1860         if (!p)
1861                 return -EIO;
1862         p->sector = cpu_to_be64(peer_req->i.sector);
1863         p->block_id = peer_req->block_id;
1864         p->seq_num = 0;  /* unused */
1865         if (dgs)
1866                 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1867         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1868         if (!err)
1869                 err = _drbd_send_zc_ee(mdev, peer_req);
1870         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1871
1872         return err;
1873 }
1874
1875 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1876 {
1877         struct drbd_socket *sock;
1878         struct p_block_desc *p;
1879
1880         sock = &mdev->tconn->data;
1881         p = drbd_prepare_command(mdev, sock);
1882         if (!p)
1883                 return -EIO;
1884         p->sector = cpu_to_be64(req->i.sector);
1885         p->blksize = cpu_to_be32(req->i.size);
1886         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1887 }
1888
1889 /*
1890   drbd_send distinguishes two cases:
1891
1892   Packets sent via the data socket "sock"
1893   and packets sent via the meta data socket "msock"
1894
1895                     sock                      msock
1896   -----------------+-------------------------+------------------------------
1897   timeout           conf.timeout / 2          conf.timeout / 2
1898   timeout action    send a ping via msock     Abort communication
1899                                               and close all sockets
1900 */
1901
1902 /*
1903  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1904  */
1905 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1906               void *buf, size_t size, unsigned msg_flags)
1907 {
1908         struct kvec iov;
1909         struct msghdr msg;
1910         int rv, sent = 0;
1911
1912         if (!sock)
1913                 return -EBADR;
1914
1915         /* THINK  if (signal_pending) return ... ? */
1916
1917         iov.iov_base = buf;
1918         iov.iov_len  = size;
1919
1920         msg.msg_name       = NULL;
1921         msg.msg_namelen    = 0;
1922         msg.msg_control    = NULL;
1923         msg.msg_controllen = 0;
1924         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1925
1926         if (sock == tconn->data.socket) {
1927                 rcu_read_lock();
1928                 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1929                 rcu_read_unlock();
1930                 drbd_update_congested(tconn);
1931         }
1932         do {
1933                 /* STRANGE
1934                  * tcp_sendmsg does _not_ use its size parameter at all ?
1935                  *
1936                  * -EAGAIN on timeout, -EINTR on signal.
1937                  */
1938 /* THINK
1939  * do we need to block DRBD_SIG if sock == &meta.socket ??
1940  * otherwise wake_asender() might interrupt some send_*Ack !
1941  */
1942                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1943                 if (rv == -EAGAIN) {
1944                         if (we_should_drop_the_connection(tconn, sock))
1945                                 break;
1946                         else
1947                                 continue;
1948                 }
1949                 if (rv == -EINTR) {
1950                         flush_signals(current);
1951                         rv = 0;
1952                 }
1953                 if (rv < 0)
1954                         break;
1955                 sent += rv;
1956                 iov.iov_base += rv;
1957                 iov.iov_len  -= rv;
1958         } while (sent < size);
1959
1960         if (sock == tconn->data.socket)
1961                 clear_bit(NET_CONGESTED, &tconn->flags);
1962
1963         if (rv <= 0) {
1964                 if (rv != -EAGAIN) {
1965                         conn_err(tconn, "%s_sendmsg returned %d\n",
1966                                  sock == tconn->meta.socket ? "msock" : "sock",
1967                                  rv);
1968                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1969                 } else
1970                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1971         }
1972
1973         return sent;
1974 }
1975
1976 /**
1977  * drbd_send_all  -  Send an entire buffer
1978  *
1979  * Returns 0 upon success and a negative error value otherwise.
1980  */
1981 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1982                   size_t size, unsigned msg_flags)
1983 {
1984         int err;
1985
1986         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1987         if (err < 0)
1988                 return err;
1989         if (err != size)
1990                 return -EIO;
1991         return 0;
1992 }
1993
1994 static int drbd_open(struct block_device *bdev, fmode_t mode)
1995 {
1996         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1997         unsigned long flags;
1998         int rv = 0;
1999
2000         mutex_lock(&drbd_main_mutex);
2001         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
2002         /* to have a stable mdev->state.role
2003          * and no race with updating open_cnt */
2004
2005         if (mdev->state.role != R_PRIMARY) {
2006                 if (mode & FMODE_WRITE)
2007                         rv = -EROFS;
2008                 else if (!allow_oos)
2009                         rv = -EMEDIUMTYPE;
2010         }
2011
2012         if (!rv)
2013                 mdev->open_cnt++;
2014         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
2015         mutex_unlock(&drbd_main_mutex);
2016
2017         return rv;
2018 }
2019
2020 static int drbd_release(struct gendisk *gd, fmode_t mode)
2021 {
2022         struct drbd_conf *mdev = gd->private_data;
2023         mutex_lock(&drbd_main_mutex);
2024         mdev->open_cnt--;
2025         mutex_unlock(&drbd_main_mutex);
2026         return 0;
2027 }
2028
2029 static void drbd_set_defaults(struct drbd_conf *mdev)
2030 {
2031         /* Beware! The actual layout differs
2032          * between big endian and little endian */
2033         mdev->state = (union drbd_dev_state) {
2034                 { .role = R_SECONDARY,
2035                   .peer = R_UNKNOWN,
2036                   .conn = C_STANDALONE,
2037                   .disk = D_DISKLESS,
2038                   .pdsk = D_UNKNOWN,
2039                 } };
2040 }
2041
2042 void drbd_init_set_defaults(struct drbd_conf *mdev)
2043 {
2044         /* the memset(,0,) did most of this.
2045          * note: only assignments, no allocation in here */
2046
2047         drbd_set_defaults(mdev);
2048
2049         atomic_set(&mdev->ap_bio_cnt, 0);
2050         atomic_set(&mdev->ap_pending_cnt, 0);
2051         atomic_set(&mdev->rs_pending_cnt, 0);
2052         atomic_set(&mdev->unacked_cnt, 0);
2053         atomic_set(&mdev->local_cnt, 0);
2054         atomic_set(&mdev->pp_in_use_by_net, 0);
2055         atomic_set(&mdev->rs_sect_in, 0);
2056         atomic_set(&mdev->rs_sect_ev, 0);
2057         atomic_set(&mdev->ap_in_flight, 0);
2058         atomic_set(&mdev->md_io_in_use, 0);
2059
2060         mutex_init(&mdev->own_state_mutex);
2061         mdev->state_mutex = &mdev->own_state_mutex;
2062
2063         spin_lock_init(&mdev->al_lock);
2064         spin_lock_init(&mdev->peer_seq_lock);
2065         spin_lock_init(&mdev->epoch_lock);
2066
2067         INIT_LIST_HEAD(&mdev->active_ee);
2068         INIT_LIST_HEAD(&mdev->sync_ee);
2069         INIT_LIST_HEAD(&mdev->done_ee);
2070         INIT_LIST_HEAD(&mdev->read_ee);
2071         INIT_LIST_HEAD(&mdev->net_ee);
2072         INIT_LIST_HEAD(&mdev->resync_reads);
2073         INIT_LIST_HEAD(&mdev->resync_work.list);
2074         INIT_LIST_HEAD(&mdev->unplug_work.list);
2075         INIT_LIST_HEAD(&mdev->go_diskless.list);
2076         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2077         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2078         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2079
2080         mdev->resync_work.cb  = w_resync_timer;
2081         mdev->unplug_work.cb  = w_send_write_hint;
2082         mdev->go_diskless.cb  = w_go_diskless;
2083         mdev->md_sync_work.cb = w_md_sync;
2084         mdev->bm_io_work.w.cb = w_bitmap_io;
2085         mdev->start_resync_work.cb = w_start_resync;
2086
2087         mdev->resync_work.mdev  = mdev;
2088         mdev->unplug_work.mdev  = mdev;
2089         mdev->go_diskless.mdev  = mdev;
2090         mdev->md_sync_work.mdev = mdev;
2091         mdev->bm_io_work.w.mdev = mdev;
2092         mdev->start_resync_work.mdev = mdev;
2093
2094         init_timer(&mdev->resync_timer);
2095         init_timer(&mdev->md_sync_timer);
2096         init_timer(&mdev->start_resync_timer);
2097         init_timer(&mdev->request_timer);
2098         mdev->resync_timer.function = resync_timer_fn;
2099         mdev->resync_timer.data = (unsigned long) mdev;
2100         mdev->md_sync_timer.function = md_sync_timer_fn;
2101         mdev->md_sync_timer.data = (unsigned long) mdev;
2102         mdev->start_resync_timer.function = start_resync_timer_fn;
2103         mdev->start_resync_timer.data = (unsigned long) mdev;
2104         mdev->request_timer.function = request_timer_fn;
2105         mdev->request_timer.data = (unsigned long) mdev;
2106
2107         init_waitqueue_head(&mdev->misc_wait);
2108         init_waitqueue_head(&mdev->state_wait);
2109         init_waitqueue_head(&mdev->ee_wait);
2110         init_waitqueue_head(&mdev->al_wait);
2111         init_waitqueue_head(&mdev->seq_wait);
2112
2113         mdev->write_ordering = WO_bdev_flush;
2114         mdev->resync_wenr = LC_FREE;
2115         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2116         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2117 }
2118
2119 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2120 {
2121         int i;
2122         if (mdev->tconn->receiver.t_state != NONE)
2123                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2124                                 mdev->tconn->receiver.t_state);
2125
2126         /* no need to lock it, I'm the only thread alive */
2127         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2128                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2129         mdev->al_writ_cnt  =
2130         mdev->bm_writ_cnt  =
2131         mdev->read_cnt     =
2132         mdev->recv_cnt     =
2133         mdev->send_cnt     =
2134         mdev->writ_cnt     =
2135         mdev->p_size       =
2136         mdev->rs_start     =
2137         mdev->rs_total     =
2138         mdev->rs_failed    = 0;
2139         mdev->rs_last_events = 0;
2140         mdev->rs_last_sect_ev = 0;
2141         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2142                 mdev->rs_mark_left[i] = 0;
2143                 mdev->rs_mark_time[i] = 0;
2144         }
2145         D_ASSERT(mdev->tconn->net_conf == NULL);
2146
2147         drbd_set_my_capacity(mdev, 0);
2148         if (mdev->bitmap) {
2149                 /* maybe never allocated. */
2150                 drbd_bm_resize(mdev, 0, 1);
2151                 drbd_bm_cleanup(mdev);
2152         }
2153
2154         drbd_free_bc(mdev->ldev);
2155         mdev->ldev = NULL;
2156
2157         clear_bit(AL_SUSPENDED, &mdev->flags);
2158
2159         D_ASSERT(list_empty(&mdev->active_ee));
2160         D_ASSERT(list_empty(&mdev->sync_ee));
2161         D_ASSERT(list_empty(&mdev->done_ee));
2162         D_ASSERT(list_empty(&mdev->read_ee));
2163         D_ASSERT(list_empty(&mdev->net_ee));
2164         D_ASSERT(list_empty(&mdev->resync_reads));
2165         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2166         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2167         D_ASSERT(list_empty(&mdev->resync_work.list));
2168         D_ASSERT(list_empty(&mdev->unplug_work.list));
2169         D_ASSERT(list_empty(&mdev->go_diskless.list));
2170
2171         drbd_set_defaults(mdev);
2172 }
2173
2174
2175 static void drbd_destroy_mempools(void)
2176 {
2177         struct page *page;
2178
2179         while (drbd_pp_pool) {
2180                 page = drbd_pp_pool;
2181                 drbd_pp_pool = (struct page *)page_private(page);
2182                 __free_page(page);
2183                 drbd_pp_vacant--;
2184         }
2185
2186         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2187
2188         if (drbd_md_io_bio_set)
2189                 bioset_free(drbd_md_io_bio_set);
2190         if (drbd_md_io_page_pool)
2191                 mempool_destroy(drbd_md_io_page_pool);
2192         if (drbd_ee_mempool)
2193                 mempool_destroy(drbd_ee_mempool);
2194         if (drbd_request_mempool)
2195                 mempool_destroy(drbd_request_mempool);
2196         if (drbd_ee_cache)
2197                 kmem_cache_destroy(drbd_ee_cache);
2198         if (drbd_request_cache)
2199                 kmem_cache_destroy(drbd_request_cache);
2200         if (drbd_bm_ext_cache)
2201                 kmem_cache_destroy(drbd_bm_ext_cache);
2202         if (drbd_al_ext_cache)
2203                 kmem_cache_destroy(drbd_al_ext_cache);
2204
2205         drbd_md_io_bio_set   = NULL;
2206         drbd_md_io_page_pool = NULL;
2207         drbd_ee_mempool      = NULL;
2208         drbd_request_mempool = NULL;
2209         drbd_ee_cache        = NULL;
2210         drbd_request_cache   = NULL;
2211         drbd_bm_ext_cache    = NULL;
2212         drbd_al_ext_cache    = NULL;
2213
2214         return;
2215 }
2216
2217 static int drbd_create_mempools(void)
2218 {
2219         struct page *page;
2220         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2221         int i;
2222
2223         /* prepare our caches and mempools */
2224         drbd_request_mempool = NULL;
2225         drbd_ee_cache        = NULL;
2226         drbd_request_cache   = NULL;
2227         drbd_bm_ext_cache    = NULL;
2228         drbd_al_ext_cache    = NULL;
2229         drbd_pp_pool         = NULL;
2230         drbd_md_io_page_pool = NULL;
2231         drbd_md_io_bio_set   = NULL;
2232
2233         /* caches */
2234         drbd_request_cache = kmem_cache_create(
2235                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2236         if (drbd_request_cache == NULL)
2237                 goto Enomem;
2238
2239         drbd_ee_cache = kmem_cache_create(
2240                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2241         if (drbd_ee_cache == NULL)
2242                 goto Enomem;
2243
2244         drbd_bm_ext_cache = kmem_cache_create(
2245                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2246         if (drbd_bm_ext_cache == NULL)
2247                 goto Enomem;
2248
2249         drbd_al_ext_cache = kmem_cache_create(
2250                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2251         if (drbd_al_ext_cache == NULL)
2252                 goto Enomem;
2253
2254         /* mempools */
2255         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2256         if (drbd_md_io_bio_set == NULL)
2257                 goto Enomem;
2258
2259         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2260         if (drbd_md_io_page_pool == NULL)
2261                 goto Enomem;
2262
2263         drbd_request_mempool = mempool_create(number,
2264                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2265         if (drbd_request_mempool == NULL)
2266                 goto Enomem;
2267
2268         drbd_ee_mempool = mempool_create(number,
2269                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2270         if (drbd_ee_mempool == NULL)
2271                 goto Enomem;
2272
2273         /* drbd's page pool */
2274         spin_lock_init(&drbd_pp_lock);
2275
2276         for (i = 0; i < number; i++) {
2277                 page = alloc_page(GFP_HIGHUSER);
2278                 if (!page)
2279                         goto Enomem;
2280                 set_page_private(page, (unsigned long)drbd_pp_pool);
2281                 drbd_pp_pool = page;
2282         }
2283         drbd_pp_vacant = number;
2284
2285         return 0;
2286
2287 Enomem:
2288         drbd_destroy_mempools(); /* in case we allocated some */
2289         return -ENOMEM;
2290 }
2291
2292 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2293         void *unused)
2294 {
2295         /* just so we have it.  you never know what interesting things we
2296          * might want to do here some day...
2297          */
2298
2299         return NOTIFY_DONE;
2300 }
2301
2302 static struct notifier_block drbd_notifier = {
2303         .notifier_call = drbd_notify_sys,
2304 };
2305
2306 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2307 {
2308         int rr;
2309
2310         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2311         if (rr)
2312                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2313
2314         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2315         if (rr)
2316                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2317
2318         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2319         if (rr)
2320                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2321
2322         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2323         if (rr)
2324                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2325
2326         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2327         if (rr)
2328                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2329 }
2330
2331 /* caution. no locking. */
2332 void drbd_minor_destroy(struct kref *kref)
2333 {
2334         struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2335         struct drbd_tconn *tconn = mdev->tconn;
2336
2337         del_timer_sync(&mdev->request_timer);
2338
2339         /* paranoia asserts */
2340         D_ASSERT(mdev->open_cnt == 0);
2341         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2342         /* end paranoia asserts */
2343
2344         /* cleanup stuff that may have been allocated during
2345          * device (re-)configuration or state changes */
2346
2347         if (mdev->this_bdev)
2348                 bdput(mdev->this_bdev);
2349
2350         drbd_free_bc(mdev->ldev);
2351         mdev->ldev = NULL;
2352
2353         drbd_release_all_peer_reqs(mdev);
2354
2355         lc_destroy(mdev->act_log);
2356         lc_destroy(mdev->resync);
2357
2358         kfree(mdev->p_uuid);
2359         /* mdev->p_uuid = NULL; */
2360
2361         kfree(mdev->current_epoch);
2362         if (mdev->bitmap) /* should no longer be there. */
2363                 drbd_bm_cleanup(mdev);
2364         __free_page(mdev->md_io_page);
2365         put_disk(mdev->vdisk);
2366         blk_cleanup_queue(mdev->rq_queue);
2367         kfree(mdev->rs_plan_s);
2368         kfree(mdev);
2369
2370         kref_put(&tconn->kref, &conn_destroy);
2371 }
2372
2373 static void drbd_cleanup(void)
2374 {
2375         unsigned int i;
2376         struct drbd_conf *mdev;
2377         struct drbd_tconn *tconn, *tmp;
2378
2379         unregister_reboot_notifier(&drbd_notifier);
2380
2381         /* first remove proc,
2382          * drbdsetup uses it's presence to detect
2383          * whether DRBD is loaded.
2384          * If we would get stuck in proc removal,
2385          * but have netlink already deregistered,
2386          * some drbdsetup commands may wait forever
2387          * for an answer.
2388          */
2389         if (drbd_proc)
2390                 remove_proc_entry("drbd", NULL);
2391
2392         drbd_genl_unregister();
2393
2394         idr_for_each_entry(&minors, mdev, i) {
2395                 idr_remove(&minors, mdev_to_minor(mdev));
2396                 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2397                 del_gendisk(mdev->vdisk);
2398                 /* synchronize_rcu(); No other threads running at this point */
2399                 kref_put(&mdev->kref, &drbd_minor_destroy);
2400         }
2401
2402         /* not _rcu since, no other updater anymore. Genl already unregistered */
2403         list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2404                 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2405                 /* synchronize_rcu(); */
2406                 kref_put(&tconn->kref, &conn_destroy);
2407         }
2408
2409         drbd_destroy_mempools();
2410         unregister_blkdev(DRBD_MAJOR, "drbd");
2411
2412         idr_destroy(&minors);
2413
2414         printk(KERN_INFO "drbd: module cleanup done.\n");
2415 }
2416
2417 /**
2418  * drbd_congested() - Callback for pdflush
2419  * @congested_data:     User data
2420  * @bdi_bits:           Bits pdflush is currently interested in
2421  *
2422  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2423  */
2424 static int drbd_congested(void *congested_data, int bdi_bits)
2425 {
2426         struct drbd_conf *mdev = congested_data;
2427         struct request_queue *q;
2428         char reason = '-';
2429         int r = 0;
2430
2431         if (!may_inc_ap_bio(mdev)) {
2432                 /* DRBD has frozen IO */
2433                 r = bdi_bits;
2434                 reason = 'd';
2435                 goto out;
2436         }
2437
2438         if (get_ldev(mdev)) {
2439                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2440                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2441                 put_ldev(mdev);
2442                 if (r)
2443                         reason = 'b';
2444         }
2445
2446         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2447                 r |= (1 << BDI_async_congested);
2448                 reason = reason == 'b' ? 'a' : 'n';
2449         }
2450
2451 out:
2452         mdev->congestion_reason = reason;
2453         return r;
2454 }
2455
2456 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2457 {
2458         sema_init(&wq->s, 0);
2459         spin_lock_init(&wq->q_lock);
2460         INIT_LIST_HEAD(&wq->q);
2461 }
2462
2463 struct drbd_tconn *conn_get_by_name(const char *name)
2464 {
2465         struct drbd_tconn *tconn;
2466
2467         if (!name || !name[0])
2468                 return NULL;
2469
2470         rcu_read_lock();
2471         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2472                 if (!strcmp(tconn->name, name)) {
2473                         kref_get(&tconn->kref);
2474                         goto found;
2475                 }
2476         }
2477         tconn = NULL;
2478 found:
2479         rcu_read_unlock();
2480         return tconn;
2481 }
2482
2483 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2484                                      void *peer_addr, int peer_addr_len)
2485 {
2486         struct drbd_tconn *tconn;
2487
2488         rcu_read_lock();
2489         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2490                 if (tconn->my_addr_len == my_addr_len &&
2491                     tconn->peer_addr_len == peer_addr_len &&
2492                     !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2493                     !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2494                         kref_get(&tconn->kref);
2495                         goto found;
2496                 }
2497         }
2498         tconn = NULL;
2499 found:
2500         rcu_read_unlock();
2501         return tconn;
2502 }
2503
2504 static int drbd_alloc_socket(struct drbd_socket *socket)
2505 {
2506         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2507         if (!socket->rbuf)
2508                 return -ENOMEM;
2509         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2510         if (!socket->sbuf)
2511                 return -ENOMEM;
2512         return 0;
2513 }
2514
2515 static void drbd_free_socket(struct drbd_socket *socket)
2516 {
2517         free_page((unsigned long) socket->sbuf);
2518         free_page((unsigned long) socket->rbuf);
2519 }
2520
2521 void conn_free_crypto(struct drbd_tconn *tconn)
2522 {
2523         drbd_free_sock(tconn);
2524
2525         crypto_free_hash(tconn->csums_tfm);
2526         crypto_free_hash(tconn->verify_tfm);
2527         crypto_free_hash(tconn->cram_hmac_tfm);
2528         crypto_free_hash(tconn->integrity_tfm);
2529         crypto_free_hash(tconn->peer_integrity_tfm);
2530         kfree(tconn->int_dig_in);
2531         kfree(tconn->int_dig_vv);
2532
2533         tconn->csums_tfm = NULL;
2534         tconn->verify_tfm = NULL;
2535         tconn->cram_hmac_tfm = NULL;
2536         tconn->integrity_tfm = NULL;
2537         tconn->peer_integrity_tfm = NULL;
2538         tconn->int_dig_in = NULL;
2539         tconn->int_dig_vv = NULL;
2540 }
2541
2542 int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
2543 {
2544         cpumask_var_t new_cpu_mask;
2545         int err;
2546
2547         if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2548                 return -ENOMEM;
2549                 /*
2550                 retcode = ERR_NOMEM;
2551                 drbd_msg_put_info("unable to allocate cpumask");
2552                 */
2553
2554         /* silently ignore cpu mask on UP kernel */
2555         if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2556                 /* FIXME: Get rid of constant 32 here */
2557                 err = __bitmap_parse(res_opts->cpu_mask, 32, 0,
2558                                 cpumask_bits(new_cpu_mask), nr_cpu_ids);
2559                 if (err) {
2560                         conn_warn(tconn, "__bitmap_parse() failed with %d\n", err);
2561                         /* retcode = ERR_CPU_MASK_PARSE; */
2562                         goto fail;
2563                 }
2564         }
2565         tconn->res_opts = *res_opts;
2566         if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
2567                 cpumask_copy(tconn->cpu_mask, new_cpu_mask);
2568                 drbd_calc_cpu_mask(tconn);
2569                 tconn->receiver.reset_cpu_mask = 1;
2570                 tconn->asender.reset_cpu_mask = 1;
2571                 tconn->worker.reset_cpu_mask = 1;
2572         }
2573         err = 0;
2574
2575 fail:
2576         free_cpumask_var(new_cpu_mask);
2577         return err;
2578
2579 }
2580
2581 /* caller must be under genl_lock() */
2582 struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2583 {
2584         struct drbd_tconn *tconn;
2585
2586         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2587         if (!tconn)
2588                 return NULL;
2589
2590         tconn->name = kstrdup(name, GFP_KERNEL);
2591         if (!tconn->name)
2592                 goto fail;
2593
2594         if (drbd_alloc_socket(&tconn->data))
2595                 goto fail;
2596         if (drbd_alloc_socket(&tconn->meta))
2597                 goto fail;
2598
2599         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2600                 goto fail;
2601
2602         if (set_resource_options(tconn, res_opts))
2603                 goto fail;
2604
2605         if (!tl_init(tconn))
2606                 goto fail;
2607
2608         tconn->cstate = C_STANDALONE;
2609         mutex_init(&tconn->cstate_mutex);
2610         spin_lock_init(&tconn->req_lock);
2611         mutex_init(&tconn->conf_update);
2612         init_waitqueue_head(&tconn->ping_wait);
2613         idr_init(&tconn->volumes);
2614
2615         drbd_init_workqueue(&tconn->data.work);
2616         mutex_init(&tconn->data.mutex);
2617
2618         drbd_init_workqueue(&tconn->meta.work);
2619         mutex_init(&tconn->meta.mutex);
2620
2621         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2622         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2623         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2624
2625         kref_init(&tconn->kref);
2626         list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2627
2628         return tconn;
2629
2630 fail:
2631         tl_cleanup(tconn);
2632         free_cpumask_var(tconn->cpu_mask);
2633         drbd_free_socket(&tconn->meta);
2634         drbd_free_socket(&tconn->data);
2635         kfree(tconn->name);
2636         kfree(tconn);
2637
2638         return NULL;
2639 }
2640
2641 void conn_destroy(struct kref *kref)
2642 {
2643         struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2644
2645         idr_destroy(&tconn->volumes);
2646
2647         free_cpumask_var(tconn->cpu_mask);
2648         drbd_free_socket(&tconn->meta);
2649         drbd_free_socket(&tconn->data);
2650         kfree(tconn->name);
2651         kfree(tconn->int_dig_in);
2652         kfree(tconn->int_dig_vv);
2653         kfree(tconn);
2654 }
2655
2656 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2657 {
2658         struct drbd_conf *mdev;
2659         struct gendisk *disk;
2660         struct request_queue *q;
2661         int vnr_got = vnr;
2662         int minor_got = minor;
2663         enum drbd_ret_code err = ERR_NOMEM;
2664
2665         mdev = minor_to_mdev(minor);
2666         if (mdev)
2667                 return ERR_MINOR_EXISTS;
2668
2669         /* GFP_KERNEL, we are outside of all write-out paths */
2670         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2671         if (!mdev)
2672                 return ERR_NOMEM;
2673
2674         kref_get(&tconn->kref);
2675         mdev->tconn = tconn;
2676
2677         mdev->minor = minor;
2678         mdev->vnr = vnr;
2679
2680         drbd_init_set_defaults(mdev);
2681
2682         q = blk_alloc_queue(GFP_KERNEL);
2683         if (!q)
2684                 goto out_no_q;
2685         mdev->rq_queue = q;
2686         q->queuedata   = mdev;
2687
2688         disk = alloc_disk(1);
2689         if (!disk)
2690                 goto out_no_disk;
2691         mdev->vdisk = disk;
2692
2693         set_disk_ro(disk, true);
2694
2695         disk->queue = q;
2696         disk->major = DRBD_MAJOR;
2697         disk->first_minor = minor;
2698         disk->fops = &drbd_ops;
2699         sprintf(disk->disk_name, "drbd%d", minor);
2700         disk->private_data = mdev;
2701
2702         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2703         /* we have no partitions. we contain only ourselves. */
2704         mdev->this_bdev->bd_contains = mdev->this_bdev;
2705
2706         q->backing_dev_info.congested_fn = drbd_congested;
2707         q->backing_dev_info.congested_data = mdev;
2708
2709         blk_queue_make_request(q, drbd_make_request);
2710         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2711            This triggers a max_bio_size message upon first attach or connect */
2712         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2713         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2714         blk_queue_merge_bvec(q, drbd_merge_bvec);
2715         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2716
2717         mdev->md_io_page = alloc_page(GFP_KERNEL);
2718         if (!mdev->md_io_page)
2719                 goto out_no_io_page;
2720
2721         if (drbd_bm_init(mdev))
2722                 goto out_no_bitmap;
2723         mdev->read_requests = RB_ROOT;
2724         mdev->write_requests = RB_ROOT;
2725
2726         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2727         if (!mdev->current_epoch)
2728                 goto out_no_epoch;
2729
2730         INIT_LIST_HEAD(&mdev->current_epoch->list);
2731         mdev->epochs = 1;
2732
2733         if (!idr_pre_get(&minors, GFP_KERNEL))
2734                 goto out_no_minor_idr;
2735         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2736                 goto out_no_minor_idr;
2737         if (minor_got != minor) {
2738                 err = ERR_MINOR_EXISTS;
2739                 drbd_msg_put_info("requested minor exists already");
2740                 goto out_idr_remove_minor;
2741         }
2742
2743         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2744                 goto out_idr_remove_minor;
2745         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2746                 goto out_idr_remove_minor;
2747         if (vnr_got != vnr) {
2748                 err = ERR_INVALID_REQUEST;
2749                 drbd_msg_put_info("requested volume exists already");
2750                 goto out_idr_remove_vol;
2751         }
2752         add_disk(disk);
2753         kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2754
2755         /* inherit the connection state */
2756         mdev->state.conn = tconn->cstate;
2757         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2758                 drbd_connected(mdev);
2759
2760         return NO_ERROR;
2761
2762 out_idr_remove_vol:
2763         idr_remove(&tconn->volumes, vnr_got);
2764 out_idr_remove_minor:
2765         idr_remove(&minors, minor_got);
2766         synchronize_rcu();
2767 out_no_minor_idr:
2768         kfree(mdev->current_epoch);
2769 out_no_epoch:
2770         drbd_bm_cleanup(mdev);
2771 out_no_bitmap:
2772         __free_page(mdev->md_io_page);
2773 out_no_io_page:
2774         put_disk(disk);
2775 out_no_disk:
2776         blk_cleanup_queue(q);
2777 out_no_q:
2778         kfree(mdev);
2779         kref_put(&tconn->kref, &conn_destroy);
2780         return err;
2781 }
2782
2783 int __init drbd_init(void)
2784 {
2785         int err;
2786
2787         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2788                 printk(KERN_ERR
2789                        "drbd: invalid minor_count (%d)\n", minor_count);
2790 #ifdef MODULE
2791                 return -EINVAL;
2792 #else
2793                 minor_count = DRBD_MINOR_COUNT_DEF;
2794 #endif
2795         }
2796
2797         err = register_blkdev(DRBD_MAJOR, "drbd");
2798         if (err) {
2799                 printk(KERN_ERR
2800                        "drbd: unable to register block device major %d\n",
2801                        DRBD_MAJOR);
2802                 return err;
2803         }
2804
2805         err = drbd_genl_register();
2806         if (err) {
2807                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2808                 goto fail;
2809         }
2810
2811
2812         register_reboot_notifier(&drbd_notifier);
2813
2814         /*
2815          * allocate all necessary structs
2816          */
2817         err = -ENOMEM;
2818
2819         init_waitqueue_head(&drbd_pp_wait);
2820
2821         drbd_proc = NULL; /* play safe for drbd_cleanup */
2822         idr_init(&minors);
2823
2824         err = drbd_create_mempools();
2825         if (err)
2826                 goto fail;
2827
2828         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2829         if (!drbd_proc) {
2830                 printk(KERN_ERR "drbd: unable to register proc file\n");
2831                 goto fail;
2832         }
2833
2834         rwlock_init(&global_state_lock);
2835         INIT_LIST_HEAD(&drbd_tconns);
2836
2837         printk(KERN_INFO "drbd: initialized. "
2838                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2839                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2840         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2841         printk(KERN_INFO "drbd: registered as block device major %d\n",
2842                 DRBD_MAJOR);
2843
2844         return 0; /* Success! */
2845
2846 fail:
2847         drbd_cleanup();
2848         if (err == -ENOMEM)
2849                 /* currently always the case */
2850                 printk(KERN_ERR "drbd: ran out of memory\n");
2851         else
2852                 printk(KERN_ERR "drbd: initialization failure\n");
2853         return err;
2854 }
2855
2856 void drbd_free_bc(struct drbd_backing_dev *ldev)
2857 {
2858         if (ldev == NULL)
2859                 return;
2860
2861         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2862         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2863
2864         kfree(ldev);
2865 }
2866
2867 void drbd_free_sock(struct drbd_tconn *tconn)
2868 {
2869         if (tconn->data.socket) {
2870                 mutex_lock(&tconn->data.mutex);
2871                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2872                 sock_release(tconn->data.socket);
2873                 tconn->data.socket = NULL;
2874                 mutex_unlock(&tconn->data.mutex);
2875         }
2876         if (tconn->meta.socket) {
2877                 mutex_lock(&tconn->meta.mutex);
2878                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2879                 sock_release(tconn->meta.socket);
2880                 tconn->meta.socket = NULL;
2881                 mutex_unlock(&tconn->meta.mutex);
2882         }
2883 }
2884
2885 /* meta data management */
2886
2887 struct meta_data_on_disk {
2888         u64 la_size;           /* last agreed size. */
2889         u64 uuid[UI_SIZE];   /* UUIDs. */
2890         u64 device_uuid;
2891         u64 reserved_u64_1;
2892         u32 flags;             /* MDF */
2893         u32 magic;
2894         u32 md_size_sect;
2895         u32 al_offset;         /* offset to this block */
2896         u32 al_nr_extents;     /* important for restoring the AL */
2897               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2898         u32 bm_offset;         /* offset to the bitmap, from here */
2899         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2900         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2901         u32 reserved_u32[3];
2902
2903 } __packed;
2904
2905 /**
2906  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2907  * @mdev:       DRBD device.
2908  */
2909 void drbd_md_sync(struct drbd_conf *mdev)
2910 {
2911         struct meta_data_on_disk *buffer;
2912         sector_t sector;
2913         int i;
2914
2915         del_timer(&mdev->md_sync_timer);
2916         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2917         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2918                 return;
2919
2920         /* We use here D_FAILED and not D_ATTACHING because we try to write
2921          * metadata even if we detach due to a disk failure! */
2922         if (!get_ldev_if_state(mdev, D_FAILED))
2923                 return;
2924
2925         buffer = drbd_md_get_buffer(mdev);
2926         if (!buffer)
2927                 goto out;
2928
2929         memset(buffer, 0, 512);
2930
2931         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2932         for (i = UI_CURRENT; i < UI_SIZE; i++)
2933                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2934         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2935         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2936
2937         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2938         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2939         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2940         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2941         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2942
2943         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2944         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2945
2946         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2947         sector = mdev->ldev->md.md_offset;
2948
2949         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2950                 /* this was a try anyways ... */
2951                 dev_err(DEV, "meta data update failed!\n");
2952                 drbd_chk_io_error(mdev, 1, true);
2953         }
2954
2955         /* Update mdev->ldev->md.la_size_sect,
2956          * since we updated it on metadata. */
2957         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2958
2959         drbd_md_put_buffer(mdev);
2960 out:
2961         put_ldev(mdev);
2962 }
2963
2964 /**
2965  * drbd_md_read() - Reads in the meta data super block
2966  * @mdev:       DRBD device.
2967  * @bdev:       Device from which the meta data should be read in.
2968  *
2969  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2970  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2971  */
2972 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2973 {
2974         struct meta_data_on_disk *buffer;
2975         int i, rv = NO_ERROR;
2976
2977         if (!get_ldev_if_state(mdev, D_ATTACHING))
2978                 return ERR_IO_MD_DISK;
2979
2980         buffer = drbd_md_get_buffer(mdev);
2981         if (!buffer)
2982                 goto out;
2983
2984         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2985                 /* NOTE: can't do normal error processing here as this is
2986                    called BEFORE disk is attached */
2987                 dev_err(DEV, "Error while reading metadata.\n");
2988                 rv = ERR_IO_MD_DISK;
2989                 goto err;
2990         }
2991
2992         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2993                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2994                 rv = ERR_MD_INVALID;
2995                 goto err;
2996         }
2997         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2998                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2999                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3000                 rv = ERR_MD_INVALID;
3001                 goto err;
3002         }
3003         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3004                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3005                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3006                 rv = ERR_MD_INVALID;
3007                 goto err;
3008         }
3009         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3010                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3011                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3012                 rv = ERR_MD_INVALID;
3013                 goto err;
3014         }
3015
3016         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3017                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3018                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3019                 rv = ERR_MD_INVALID;
3020                 goto err;
3021         }
3022
3023         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3024         for (i = UI_CURRENT; i < UI_SIZE; i++)
3025                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3026         bdev->md.flags = be32_to_cpu(buffer->flags);
3027         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3028
3029         spin_lock_irq(&mdev->tconn->req_lock);
3030         if (mdev->state.conn < C_CONNECTED) {
3031                 int peer;
3032                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3033                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3034                 mdev->peer_max_bio_size = peer;
3035         }
3036         spin_unlock_irq(&mdev->tconn->req_lock);
3037
3038         /* This blocks wants to be get removed... */
3039         bdev->disk_conf->al_extents = be32_to_cpu(buffer->al_nr_extents);
3040         if (bdev->disk_conf->al_extents < DRBD_AL_EXTENTS_MIN)
3041                 bdev->disk_conf->al_extents = DRBD_AL_EXTENTS_DEF;
3042
3043  err:
3044         drbd_md_put_buffer(mdev);
3045  out:
3046         put_ldev(mdev);
3047
3048         return rv;
3049 }
3050
3051 /**
3052  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3053  * @mdev:       DRBD device.
3054  *
3055  * Call this function if you change anything that should be written to
3056  * the meta-data super block. This function sets MD_DIRTY, and starts a
3057  * timer that ensures that within five seconds you have to call drbd_md_sync().
3058  */
3059 #ifdef DEBUG
3060 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3061 {
3062         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3063                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3064                 mdev->last_md_mark_dirty.line = line;
3065                 mdev->last_md_mark_dirty.func = func;
3066         }
3067 }
3068 #else
3069 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3070 {
3071         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3072                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3073 }
3074 #endif
3075
3076 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3077 {
3078         int i;
3079
3080         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3081                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3082 }
3083
3084 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3085 {
3086         if (idx == UI_CURRENT) {
3087                 if (mdev->state.role == R_PRIMARY)
3088                         val |= 1;
3089                 else
3090                         val &= ~((u64)1);
3091
3092                 drbd_set_ed_uuid(mdev, val);
3093         }
3094
3095         mdev->ldev->md.uuid[idx] = val;
3096         drbd_md_mark_dirty(mdev);
3097 }
3098
3099
3100 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3101 {
3102         if (mdev->ldev->md.uuid[idx]) {
3103                 drbd_uuid_move_history(mdev);
3104                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3105         }
3106         _drbd_uuid_set(mdev, idx, val);
3107 }
3108
3109 /**
3110  * drbd_uuid_new_current() - Creates a new current UUID
3111  * @mdev:       DRBD device.
3112  *
3113  * Creates a new current UUID, and rotates the old current UUID into
3114  * the bitmap slot. Causes an incremental resync upon next connect.
3115  */
3116 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3117 {
3118         u64 val;
3119         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3120
3121         if (bm_uuid)
3122                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3123
3124         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3125
3126         get_random_bytes(&val, sizeof(u64));
3127         _drbd_uuid_set(mdev, UI_CURRENT, val);
3128         drbd_print_uuids(mdev, "new current UUID");
3129         /* get it to stable storage _now_ */
3130         drbd_md_sync(mdev);
3131 }
3132
3133 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3134 {
3135         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3136                 return;
3137
3138         if (val == 0) {
3139                 drbd_uuid_move_history(mdev);
3140                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3141                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3142         } else {
3143                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3144                 if (bm_uuid)
3145                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3146
3147                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3148         }
3149         drbd_md_mark_dirty(mdev);
3150 }
3151
3152 /**
3153  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3154  * @mdev:       DRBD device.
3155  *
3156  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3157  */
3158 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3159 {
3160         int rv = -EIO;
3161
3162         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3163                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3164                 drbd_md_sync(mdev);
3165                 drbd_bm_set_all(mdev);
3166
3167                 rv = drbd_bm_write(mdev);
3168
3169                 if (!rv) {
3170                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3171                         drbd_md_sync(mdev);
3172                 }
3173
3174                 put_ldev(mdev);
3175         }
3176
3177         return rv;
3178 }
3179
3180 /**
3181  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3182  * @mdev:       DRBD device.
3183  *
3184  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3185  */
3186 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3187 {
3188         int rv = -EIO;
3189
3190         drbd_resume_al(mdev);
3191         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3192                 drbd_bm_clear_all(mdev);
3193                 rv = drbd_bm_write(mdev);
3194                 put_ldev(mdev);
3195         }
3196
3197         return rv;
3198 }
3199
3200 static int w_bitmap_io(struct drbd_work *w, int unused)
3201 {
3202         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3203         struct drbd_conf *mdev = w->mdev;
3204         int rv = -EIO;
3205
3206         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3207
3208         if (get_ldev(mdev)) {
3209                 drbd_bm_lock(mdev, work->why, work->flags);
3210                 rv = work->io_fn(mdev);
3211                 drbd_bm_unlock(mdev);
3212                 put_ldev(mdev);
3213         }
3214
3215         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3216         wake_up(&mdev->misc_wait);
3217
3218         if (work->done)
3219                 work->done(mdev, rv);
3220
3221         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3222         work->why = NULL;
3223         work->flags = 0;
3224
3225         return 0;
3226 }
3227
3228 void drbd_ldev_destroy(struct drbd_conf *mdev)
3229 {
3230         lc_destroy(mdev->resync);
3231         mdev->resync = NULL;
3232         lc_destroy(mdev->act_log);
3233         mdev->act_log = NULL;
3234         __no_warn(local,
3235                 drbd_free_bc(mdev->ldev);
3236                 mdev->ldev = NULL;);
3237
3238         clear_bit(GO_DISKLESS, &mdev->flags);
3239 }
3240
3241 static int w_go_diskless(struct drbd_work *w, int unused)
3242 {
3243         struct drbd_conf *mdev = w->mdev;
3244
3245         D_ASSERT(mdev->state.disk == D_FAILED);
3246         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3247          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3248          * the protected members anymore, though, so once put_ldev reaches zero
3249          * again, it will be safe to free them. */
3250         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3251         return 0;
3252 }
3253
3254 void drbd_go_diskless(struct drbd_conf *mdev)
3255 {
3256         D_ASSERT(mdev->state.disk == D_FAILED);
3257         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3258                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3259 }
3260
3261 /**
3262  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3263  * @mdev:       DRBD device.
3264  * @io_fn:      IO callback to be called when bitmap IO is possible
3265  * @done:       callback to be called after the bitmap IO was performed
3266  * @why:        Descriptive text of the reason for doing the IO
3267  *
3268  * While IO on the bitmap happens we freeze application IO thus we ensure
3269  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3270  * called from worker context. It MUST NOT be used while a previous such
3271  * work is still pending!
3272  */
3273 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3274                           int (*io_fn)(struct drbd_conf *),
3275                           void (*done)(struct drbd_conf *, int),
3276                           char *why, enum bm_flag flags)
3277 {
3278         D_ASSERT(current == mdev->tconn->worker.task);
3279
3280         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3281         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3282         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3283         if (mdev->bm_io_work.why)
3284                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3285                         why, mdev->bm_io_work.why);
3286
3287         mdev->bm_io_work.io_fn = io_fn;
3288         mdev->bm_io_work.done = done;
3289         mdev->bm_io_work.why = why;
3290         mdev->bm_io_work.flags = flags;
3291
3292         spin_lock_irq(&mdev->tconn->req_lock);
3293         set_bit(BITMAP_IO, &mdev->flags);
3294         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3295                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3296                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3297         }
3298         spin_unlock_irq(&mdev->tconn->req_lock);
3299 }
3300
3301 /**
3302  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3303  * @mdev:       DRBD device.
3304  * @io_fn:      IO callback to be called when bitmap IO is possible
3305  * @why:        Descriptive text of the reason for doing the IO
3306  *
3307  * freezes application IO while that the actual IO operations runs. This
3308  * functions MAY NOT be called from worker context.
3309  */
3310 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3311                 char *why, enum bm_flag flags)
3312 {
3313         int rv;
3314
3315         D_ASSERT(current != mdev->tconn->worker.task);
3316
3317         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3318                 drbd_suspend_io(mdev);
3319
3320         drbd_bm_lock(mdev, why, flags);
3321         rv = io_fn(mdev);
3322         drbd_bm_unlock(mdev);
3323
3324         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3325                 drbd_resume_io(mdev);
3326
3327         return rv;
3328 }
3329
3330 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3331 {
3332         if ((mdev->ldev->md.flags & flag) != flag) {
3333                 drbd_md_mark_dirty(mdev);
3334                 mdev->ldev->md.flags |= flag;
3335         }
3336 }
3337
3338 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3339 {
3340         if ((mdev->ldev->md.flags & flag) != 0) {
3341                 drbd_md_mark_dirty(mdev);
3342                 mdev->ldev->md.flags &= ~flag;
3343         }
3344 }
3345 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3346 {
3347         return (bdev->md.flags & flag) != 0;
3348 }
3349
3350 static void md_sync_timer_fn(unsigned long data)
3351 {
3352         struct drbd_conf *mdev = (struct drbd_conf *) data;
3353
3354         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3355 }
3356
3357 static int w_md_sync(struct drbd_work *w, int unused)
3358 {
3359         struct drbd_conf *mdev = w->mdev;
3360
3361         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3362 #ifdef DEBUG
3363         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3364                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3365 #endif
3366         drbd_md_sync(mdev);
3367         return 0;
3368 }
3369
3370 const char *cmdname(enum drbd_packet cmd)
3371 {
3372         /* THINK may need to become several global tables
3373          * when we want to support more than
3374          * one PRO_VERSION */
3375         static const char *cmdnames[] = {
3376                 [P_DATA]                = "Data",
3377                 [P_DATA_REPLY]          = "DataReply",
3378                 [P_RS_DATA_REPLY]       = "RSDataReply",
3379                 [P_BARRIER]             = "Barrier",
3380                 [P_BITMAP]              = "ReportBitMap",
3381                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3382                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3383                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3384                 [P_DATA_REQUEST]        = "DataRequest",
3385                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3386                 [P_SYNC_PARAM]          = "SyncParam",
3387                 [P_SYNC_PARAM89]        = "SyncParam89",
3388                 [P_PROTOCOL]            = "ReportProtocol",
3389                 [P_UUIDS]               = "ReportUUIDs",
3390                 [P_SIZES]               = "ReportSizes",
3391                 [P_STATE]               = "ReportState",
3392                 [P_SYNC_UUID]           = "ReportSyncUUID",
3393                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3394                 [P_AUTH_RESPONSE]       = "AuthResponse",
3395                 [P_PING]                = "Ping",
3396                 [P_PING_ACK]            = "PingAck",
3397                 [P_RECV_ACK]            = "RecvAck",
3398                 [P_WRITE_ACK]           = "WriteAck",
3399                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3400                 [P_DISCARD_WRITE]        = "DiscardWrite",
3401                 [P_NEG_ACK]             = "NegAck",
3402                 [P_NEG_DREPLY]          = "NegDReply",
3403                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3404                 [P_BARRIER_ACK]         = "BarrierAck",
3405                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3406                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3407                 [P_OV_REQUEST]          = "OVRequest",
3408                 [P_OV_REPLY]            = "OVReply",
3409                 [P_OV_RESULT]           = "OVResult",
3410                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3411                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3412                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3413                 [P_DELAY_PROBE]         = "DelayProbe",
3414                 [P_OUT_OF_SYNC]         = "OutOfSync",
3415                 [P_RETRY_WRITE]         = "RetryWrite",
3416                 [P_RS_CANCEL]           = "RSCancel",
3417                 [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3418                 [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3419                 [P_RETRY_WRITE]         = "retry_write",
3420                 [P_PROTOCOL_UPDATE]     = "protocol_update",
3421
3422                 /* enum drbd_packet, but not commands - obsoleted flags:
3423                  *      P_MAY_IGNORE
3424                  *      P_MAX_OPT_CMD
3425                  */
3426         };
3427
3428         /* too big for the array: 0xfffX */
3429         if (cmd == P_INITIAL_META)
3430                 return "InitialMeta";
3431         if (cmd == P_INITIAL_DATA)
3432                 return "InitialData";
3433         if (cmd == P_CONNECTION_FEATURES)
3434                 return "ConnectionFeatures";
3435         if (cmd >= ARRAY_SIZE(cmdnames))
3436                 return "Unknown";
3437         return cmdnames[cmd];
3438 }
3439
3440 /**
3441  * drbd_wait_misc  -  wait for a request to make progress
3442  * @mdev:       device associated with the request
3443  * @i:          the struct drbd_interval embedded in struct drbd_request or
3444  *              struct drbd_peer_request
3445  */
3446 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3447 {
3448         struct net_conf *nc;
3449         DEFINE_WAIT(wait);
3450         long timeout;
3451
3452         rcu_read_lock();
3453         nc = rcu_dereference(mdev->tconn->net_conf);
3454         if (!nc) {
3455                 rcu_read_unlock();
3456                 return -ETIMEDOUT;
3457         }
3458         timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3459         rcu_read_unlock();
3460
3461         /* Indicate to wake up mdev->misc_wait on progress.  */
3462         i->waiting = true;
3463         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3464         spin_unlock_irq(&mdev->tconn->req_lock);
3465         timeout = schedule_timeout(timeout);
3466         finish_wait(&mdev->misc_wait, &wait);
3467         spin_lock_irq(&mdev->tconn->req_lock);
3468         if (!timeout || mdev->state.conn < C_CONNECTED)
3469                 return -ETIMEDOUT;
3470         if (signal_pending(current))
3471                 return -ERESTARTSYS;
3472         return 0;
3473 }
3474
3475 #ifdef CONFIG_DRBD_FAULT_INJECTION
3476 /* Fault insertion support including random number generator shamelessly
3477  * stolen from kernel/rcutorture.c */
3478 struct fault_random_state {
3479         unsigned long state;
3480         unsigned long count;
3481 };
3482
3483 #define FAULT_RANDOM_MULT 39916801  /* prime */
3484 #define FAULT_RANDOM_ADD        479001701 /* prime */
3485 #define FAULT_RANDOM_REFRESH 10000
3486
3487 /*
3488  * Crude but fast random-number generator.  Uses a linear congruential
3489  * generator, with occasional help from get_random_bytes().
3490  */
3491 static unsigned long
3492 _drbd_fault_random(struct fault_random_state *rsp)
3493 {
3494         long refresh;
3495
3496         if (!rsp->count--) {
3497                 get_random_bytes(&refresh, sizeof(refresh));
3498                 rsp->state += refresh;
3499                 rsp->count = FAULT_RANDOM_REFRESH;
3500         }
3501         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3502         return swahw32(rsp->state);
3503 }
3504
3505 static char *
3506 _drbd_fault_str(unsigned int type) {
3507         static char *_faults[] = {
3508                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3509                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3510                 [DRBD_FAULT_RS_WR] = "Resync write",
3511                 [DRBD_FAULT_RS_RD] = "Resync read",
3512                 [DRBD_FAULT_DT_WR] = "Data write",
3513                 [DRBD_FAULT_DT_RD] = "Data read",
3514                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3515                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3516                 [DRBD_FAULT_AL_EE] = "EE allocation",
3517                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3518         };
3519
3520         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3521 }
3522
3523 unsigned int
3524 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3525 {
3526         static struct fault_random_state rrs = {0, 0};
3527
3528         unsigned int ret = (
3529                 (fault_devs == 0 ||
3530                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3531                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3532
3533         if (ret) {
3534                 fault_count++;
3535
3536                 if (__ratelimit(&drbd_ratelimit_state))
3537                         dev_warn(DEV, "***Simulating %s failure\n",
3538                                 _drbd_fault_str(type));
3539         }
3540
3541         return ret;
3542 }
3543 #endif
3544
3545 const char *drbd_buildtag(void)
3546 {
3547         /* DRBD built from external sources has here a reference to the
3548            git hash of the source code. */
3549
3550         static char buildtag[38] = "\0uilt-in";
3551
3552         if (buildtag[0] == 0) {
3553 #ifdef CONFIG_MODULES
3554                 if (THIS_MODULE != NULL)
3555                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3556                 else
3557 #endif
3558                         buildtag[0] = 'b';
3559         }
3560
3561         return buildtag;
3562 }
3563
3564 module_init(drbd_init)
3565 module_exit(drbd_cleanup)
3566
3567 EXPORT_SYMBOL(drbd_conn_str);
3568 EXPORT_SYMBOL(drbd_role_str);
3569 EXPORT_SYMBOL(drbd_disk_str);
3570 EXPORT_SYMBOL(drbd_set_st_err_str);