drbd: log request sector offset and size for IO errors
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 enum finish_epoch {
52         FE_STILL_LIVE,
53         FE_DESTROYED,
54         FE_RECYCLED,
55 };
56
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
63
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77         struct page *page;
78         struct page *tmp;
79
80         BUG_ON(!n);
81         BUG_ON(!head);
82
83         page = *head;
84
85         if (!page)
86                 return NULL;
87
88         while (page) {
89                 tmp = page_chain_next(page);
90                 if (--n == 0)
91                         break; /* found sufficient pages */
92                 if (tmp == NULL)
93                         /* insufficient pages, don't use any of them. */
94                         return NULL;
95                 page = tmp;
96         }
97
98         /* add end of list marker for the returned list */
99         set_page_private(page, 0);
100         /* actual return value, and adjustment of head */
101         page = *head;
102         *head = tmp;
103         return page;
104 }
105
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111         struct page *tmp;
112         int i = 1;
113         while ((tmp = page_chain_next(page)))
114                 ++i, page = tmp;
115         if (len)
116                 *len = i;
117         return page;
118 }
119
120 static int page_chain_free(struct page *page)
121 {
122         struct page *tmp;
123         int i = 0;
124         page_chain_for_each_safe(page, tmp) {
125                 put_page(page);
126                 ++i;
127         }
128         return i;
129 }
130
131 static void page_chain_add(struct page **head,
132                 struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135         struct page *tmp;
136         tmp = page_chain_tail(chain_first, NULL);
137         BUG_ON(tmp != chain_last);
138 #endif
139
140         /* add chain to head */
141         set_page_private(chain_last, (unsigned long)*head);
142         *head = chain_first;
143 }
144
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147         struct page *page = NULL;
148         struct page *tmp = NULL;
149         int i = 0;
150
151         /* Yes, testing drbd_pp_vacant outside the lock is racy.
152          * So what. It saves a spin_lock. */
153         if (drbd_pp_vacant >= number) {
154                 spin_lock(&drbd_pp_lock);
155                 page = page_chain_del(&drbd_pp_pool, number);
156                 if (page)
157                         drbd_pp_vacant -= number;
158                 spin_unlock(&drbd_pp_lock);
159                 if (page)
160                         return page;
161         }
162
163         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164          * "criss-cross" setup, that might cause write-out on some other DRBD,
165          * which in turn might block on the other node at this very place.  */
166         for (i = 0; i < number; i++) {
167                 tmp = alloc_page(GFP_TRY);
168                 if (!tmp)
169                         break;
170                 set_page_private(tmp, (unsigned long)page);
171                 page = tmp;
172         }
173
174         if (i == number)
175                 return page;
176
177         /* Not enough pages immediately available this time.
178          * No need to jump around here, drbd_pp_alloc will retry this
179          * function "soon". */
180         if (page) {
181                 tmp = page_chain_tail(page, NULL);
182                 spin_lock(&drbd_pp_lock);
183                 page_chain_add(&drbd_pp_pool, page, tmp);
184                 drbd_pp_vacant += i;
185                 spin_unlock(&drbd_pp_lock);
186         }
187         return NULL;
188 }
189
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192         struct drbd_epoch_entry *e;
193         struct list_head *le, *tle;
194
195         /* The EEs are always appended to the end of the list. Since
196            they are sent in order over the wire, they have to finish
197            in order. As soon as we see the first not finished we can
198            stop to examine the list... */
199
200         list_for_each_safe(le, tle, &mdev->net_ee) {
201                 e = list_entry(le, struct drbd_epoch_entry, w.list);
202                 if (drbd_ee_has_active_page(e))
203                         break;
204                 list_move(le, to_be_freed);
205         }
206 }
207
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210         LIST_HEAD(reclaimed);
211         struct drbd_epoch_entry *e, *t;
212
213         spin_lock_irq(&mdev->req_lock);
214         reclaim_net_ee(mdev, &reclaimed);
215         spin_unlock_irq(&mdev->req_lock);
216
217         list_for_each_entry_safe(e, t, &reclaimed, w.list)
218                 drbd_free_net_ee(mdev, e);
219 }
220
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:       DRBD device.
224  * @number:     number of pages requested
225  * @retry:      whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235         struct page *page = NULL;
236         DEFINE_WAIT(wait);
237
238         /* Yes, we may run up to @number over max_buffers. If we
239          * follow it strictly, the admin will get it wrong anyways. */
240         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242
243         while (page == NULL) {
244                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246                 drbd_kick_lo_and_reclaim_net(mdev);
247
248                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250                         if (page)
251                                 break;
252                 }
253
254                 if (!retry)
255                         break;
256
257                 if (signal_pending(current)) {
258                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259                         break;
260                 }
261
262                 schedule();
263         }
264         finish_wait(&drbd_pp_wait, &wait);
265
266         if (page)
267                 atomic_add(number, &mdev->pp_in_use);
268         return page;
269 }
270
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278         int i;
279
280         if (page == NULL)
281                 return;
282
283         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
284                 i = page_chain_free(page);
285         else {
286                 struct page *tmp;
287                 tmp = page_chain_tail(page, &i);
288                 spin_lock(&drbd_pp_lock);
289                 page_chain_add(&drbd_pp_pool, page, tmp);
290                 drbd_pp_vacant += i;
291                 spin_unlock(&drbd_pp_lock);
292         }
293         i = atomic_sub_return(i, a);
294         if (i < 0)
295                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
296                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
297         wake_up(&drbd_pp_wait);
298 }
299
300 /*
301 You need to hold the req_lock:
302  _drbd_wait_ee_list_empty()
303
304 You must not have the req_lock:
305  drbd_free_ee()
306  drbd_alloc_ee()
307  drbd_init_ee()
308  drbd_release_ee()
309  drbd_ee_fix_bhs()
310  drbd_process_done_ee()
311  drbd_clear_done_ee()
312  drbd_wait_ee_list_empty()
313 */
314
315 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
316                                      u64 id,
317                                      sector_t sector,
318                                      unsigned int data_size,
319                                      gfp_t gfp_mask) __must_hold(local)
320 {
321         struct drbd_epoch_entry *e;
322         struct page *page = NULL;
323         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
324
325         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
326                 return NULL;
327
328         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
329         if (!e) {
330                 if (!(gfp_mask & __GFP_NOWARN))
331                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
332                 return NULL;
333         }
334
335         if (data_size) {
336                 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
337                 if (!page)
338                         goto fail;
339         }
340
341         INIT_HLIST_NODE(&e->collision);
342         e->epoch = NULL;
343         e->mdev = mdev;
344         e->pages = page;
345         atomic_set(&e->pending_bios, 0);
346         e->size = data_size;
347         e->flags = 0;
348         e->sector = sector;
349         e->block_id = id;
350
351         return e;
352
353  fail:
354         mempool_free(e, drbd_ee_mempool);
355         return NULL;
356 }
357
358 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
359 {
360         if (e->flags & EE_HAS_DIGEST)
361                 kfree(e->digest);
362         drbd_pp_free(mdev, e->pages, is_net);
363         D_ASSERT(atomic_read(&e->pending_bios) == 0);
364         D_ASSERT(hlist_unhashed(&e->collision));
365         mempool_free(e, drbd_ee_mempool);
366 }
367
368 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
369 {
370         LIST_HEAD(work_list);
371         struct drbd_epoch_entry *e, *t;
372         int count = 0;
373         int is_net = list == &mdev->net_ee;
374
375         spin_lock_irq(&mdev->req_lock);
376         list_splice_init(list, &work_list);
377         spin_unlock_irq(&mdev->req_lock);
378
379         list_for_each_entry_safe(e, t, &work_list, w.list) {
380                 drbd_free_some_ee(mdev, e, is_net);
381                 count++;
382         }
383         return count;
384 }
385
386
387 /*
388  * This function is called from _asender only_
389  * but see also comments in _req_mod(,barrier_acked)
390  * and receive_Barrier.
391  *
392  * Move entries from net_ee to done_ee, if ready.
393  * Grab done_ee, call all callbacks, free the entries.
394  * The callbacks typically send out ACKs.
395  */
396 static int drbd_process_done_ee(struct drbd_conf *mdev)
397 {
398         LIST_HEAD(work_list);
399         LIST_HEAD(reclaimed);
400         struct drbd_epoch_entry *e, *t;
401         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
402
403         spin_lock_irq(&mdev->req_lock);
404         reclaim_net_ee(mdev, &reclaimed);
405         list_splice_init(&mdev->done_ee, &work_list);
406         spin_unlock_irq(&mdev->req_lock);
407
408         list_for_each_entry_safe(e, t, &reclaimed, w.list)
409                 drbd_free_net_ee(mdev, e);
410
411         /* possible callbacks here:
412          * e_end_block, and e_end_resync_block, e_send_discard_ack.
413          * all ignore the last argument.
414          */
415         list_for_each_entry_safe(e, t, &work_list, w.list) {
416                 /* list_del not necessary, next/prev members not touched */
417                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
418                 drbd_free_ee(mdev, e);
419         }
420         wake_up(&mdev->ee_wait);
421
422         return ok;
423 }
424
425 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
426 {
427         DEFINE_WAIT(wait);
428
429         /* avoids spin_lock/unlock
430          * and calling prepare_to_wait in the fast path */
431         while (!list_empty(head)) {
432                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
433                 spin_unlock_irq(&mdev->req_lock);
434                 io_schedule();
435                 finish_wait(&mdev->ee_wait, &wait);
436                 spin_lock_irq(&mdev->req_lock);
437         }
438 }
439
440 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441 {
442         spin_lock_irq(&mdev->req_lock);
443         _drbd_wait_ee_list_empty(mdev, head);
444         spin_unlock_irq(&mdev->req_lock);
445 }
446
447 /* see also kernel_accept; which is only present since 2.6.18.
448  * also we want to log which part of it failed, exactly */
449 static int drbd_accept(struct drbd_conf *mdev, const char **what,
450                 struct socket *sock, struct socket **newsock)
451 {
452         struct sock *sk = sock->sk;
453         int err = 0;
454
455         *what = "listen";
456         err = sock->ops->listen(sock, 5);
457         if (err < 0)
458                 goto out;
459
460         *what = "sock_create_lite";
461         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
462                                newsock);
463         if (err < 0)
464                 goto out;
465
466         *what = "accept";
467         err = sock->ops->accept(sock, *newsock, 0);
468         if (err < 0) {
469                 sock_release(*newsock);
470                 *newsock = NULL;
471                 goto out;
472         }
473         (*newsock)->ops  = sock->ops;
474         __module_get((*newsock)->ops->owner);
475
476 out:
477         return err;
478 }
479
480 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
481                     void *buf, size_t size, int flags)
482 {
483         mm_segment_t oldfs;
484         struct kvec iov = {
485                 .iov_base = buf,
486                 .iov_len = size,
487         };
488         struct msghdr msg = {
489                 .msg_iovlen = 1,
490                 .msg_iov = (struct iovec *)&iov,
491                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
492         };
493         int rv;
494
495         oldfs = get_fs();
496         set_fs(KERNEL_DS);
497         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
498         set_fs(oldfs);
499
500         return rv;
501 }
502
503 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
504 {
505         mm_segment_t oldfs;
506         struct kvec iov = {
507                 .iov_base = buf,
508                 .iov_len = size,
509         };
510         struct msghdr msg = {
511                 .msg_iovlen = 1,
512                 .msg_iov = (struct iovec *)&iov,
513                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
514         };
515         int rv;
516
517         oldfs = get_fs();
518         set_fs(KERNEL_DS);
519         rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
520         set_fs(oldfs);
521
522         if (rv < 0) {
523                 if (rv == -ECONNRESET)
524                         dev_info(DEV, "sock was reset by peer\n");
525                 else if (rv != -ERESTARTSYS)
526                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
527         } else if (rv == 0) {
528                 if (drbd_test_flag(mdev, DISCONNECT_SENT)) {
529                         long t; /* time_left */
530                         t = wait_event_timeout(mdev->state_wait, mdev->state.conn < C_CONNECTED,
531                                                mdev->net_conf->ping_timeo * HZ/10);
532                         if (t)
533                                 goto out;
534                 }
535                 dev_info(DEV, "sock was shut down by peer\n");
536         }
537
538         if (rv != size)
539                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
540
541 out:
542         return rv;
543 }
544
545 /* quoting tcp(7):
546  *   On individual connections, the socket buffer size must be set prior to the
547  *   listen(2) or connect(2) calls in order to have it take effect.
548  * This is our wrapper to do so.
549  */
550 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
551                 unsigned int rcv)
552 {
553         /* open coded SO_SNDBUF, SO_RCVBUF */
554         if (snd) {
555                 sock->sk->sk_sndbuf = snd;
556                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
557         }
558         if (rcv) {
559                 sock->sk->sk_rcvbuf = rcv;
560                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
561         }
562 }
563
564 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
565 {
566         const char *what;
567         struct socket *sock;
568         struct sockaddr_in6 src_in6;
569         int err;
570         int disconnect_on_error = 1;
571
572         if (!get_net_conf(mdev))
573                 return NULL;
574
575         what = "sock_create_kern";
576         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
577                 SOCK_STREAM, IPPROTO_TCP, &sock);
578         if (err < 0) {
579                 sock = NULL;
580                 goto out;
581         }
582
583         sock->sk->sk_rcvtimeo =
584         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
585         drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
586                         mdev->net_conf->rcvbuf_size);
587
588        /* explicitly bind to the configured IP as source IP
589         *  for the outgoing connections.
590         *  This is needed for multihomed hosts and to be
591         *  able to use lo: interfaces for drbd.
592         * Make sure to use 0 as port number, so linux selects
593         *  a free one dynamically.
594         */
595         memcpy(&src_in6, mdev->net_conf->my_addr,
596                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
597         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
598                 src_in6.sin6_port = 0;
599         else
600                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
601
602         what = "bind before connect";
603         err = sock->ops->bind(sock,
604                               (struct sockaddr *) &src_in6,
605                               mdev->net_conf->my_addr_len);
606         if (err < 0)
607                 goto out;
608
609         /* connect may fail, peer not yet available.
610          * stay C_WF_CONNECTION, don't go Disconnecting! */
611         disconnect_on_error = 0;
612         what = "connect";
613         err = sock->ops->connect(sock,
614                                  (struct sockaddr *)mdev->net_conf->peer_addr,
615                                  mdev->net_conf->peer_addr_len, 0);
616
617 out:
618         if (err < 0) {
619                 if (sock) {
620                         sock_release(sock);
621                         sock = NULL;
622                 }
623                 switch (-err) {
624                         /* timeout, busy, signal pending */
625                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
626                 case EINTR: case ERESTARTSYS:
627                         /* peer not (yet) available, network problem */
628                 case ECONNREFUSED: case ENETUNREACH:
629                 case EHOSTDOWN:    case EHOSTUNREACH:
630                         disconnect_on_error = 0;
631                         break;
632                 default:
633                         dev_err(DEV, "%s failed, err = %d\n", what, err);
634                 }
635                 if (disconnect_on_error)
636                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
637         }
638         put_net_conf(mdev);
639         return sock;
640 }
641
642 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
643 {
644         int timeo, err;
645         struct socket *s_estab = NULL, *s_listen;
646         const char *what;
647
648         if (!get_net_conf(mdev))
649                 return NULL;
650
651         what = "sock_create_kern";
652         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
653                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
654         if (err) {
655                 s_listen = NULL;
656                 goto out;
657         }
658
659         timeo = mdev->net_conf->try_connect_int * HZ;
660         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
661
662         s_listen->sk->sk_reuse    = SK_CAN_REUSE; /* SO_REUSEADDR */
663         s_listen->sk->sk_rcvtimeo = timeo;
664         s_listen->sk->sk_sndtimeo = timeo;
665         drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
666                         mdev->net_conf->rcvbuf_size);
667
668         what = "bind before listen";
669         err = s_listen->ops->bind(s_listen,
670                               (struct sockaddr *) mdev->net_conf->my_addr,
671                               mdev->net_conf->my_addr_len);
672         if (err < 0)
673                 goto out;
674
675         err = drbd_accept(mdev, &what, s_listen, &s_estab);
676
677 out:
678         if (s_listen)
679                 sock_release(s_listen);
680         if (err < 0) {
681                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
682                         dev_err(DEV, "%s failed, err = %d\n", what, err);
683                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
684                 }
685         }
686         put_net_conf(mdev);
687
688         return s_estab;
689 }
690
691 static int drbd_send_fp(struct drbd_conf *mdev,
692         struct socket *sock, enum drbd_packets cmd)
693 {
694         struct p_header80 *h = &mdev->data.sbuf.header.h80;
695
696         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
697 }
698
699 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
700 {
701         struct p_header80 *h = &mdev->data.rbuf.header.h80;
702         int rr;
703
704         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
705
706         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
707                 return be16_to_cpu(h->command);
708
709         return 0xffff;
710 }
711
712 /**
713  * drbd_socket_okay() - Free the socket if its connection is not okay
714  * @mdev:       DRBD device.
715  * @sock:       pointer to the pointer to the socket.
716  */
717 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
718 {
719         int rr;
720         char tb[4];
721
722         if (!*sock)
723                 return false;
724
725         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
726
727         if (rr > 0 || rr == -EAGAIN) {
728                 return true;
729         } else {
730                 sock_release(*sock);
731                 *sock = NULL;
732                 return false;
733         }
734 }
735
736 /*
737  * return values:
738  *   1 yes, we have a valid connection
739  *   0 oops, did not work out, please try again
740  *  -1 peer talks different language,
741  *     no point in trying again, please go standalone.
742  *  -2 We do not have a network config...
743  */
744 static int drbd_connect(struct drbd_conf *mdev)
745 {
746         struct socket *s, *sock, *msock;
747         int try, h, ok;
748         enum drbd_state_rv rv;
749
750         D_ASSERT(!mdev->data.socket);
751
752         drbd_clear_flag(mdev, DISCONNECT_SENT);
753         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
754                 return -2;
755
756         sock  = NULL;
757         msock = NULL;
758
759         do {
760                 for (try = 0;;) {
761                         /* 3 tries, this should take less than a second! */
762                         s = drbd_try_connect(mdev);
763                         if (s || ++try >= 3)
764                                 break;
765                         /* give the other side time to call bind() & listen() */
766                         schedule_timeout_interruptible(HZ / 10);
767                 }
768
769                 if (s) {
770                         if (!sock) {
771                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
772                                 sock = s;
773                                 s = NULL;
774                         } else if (!msock) {
775                                 drbd_clear_flag(mdev, DISCARD_CONCURRENT);
776                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
777                                 msock = s;
778                                 s = NULL;
779                         } else {
780                                 dev_err(DEV, "Logic error in drbd_connect()\n");
781                                 goto out_release_sockets;
782                         }
783                 }
784
785                 if (sock && msock) {
786                         schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
787                         ok = drbd_socket_okay(mdev, &sock);
788                         ok = drbd_socket_okay(mdev, &msock) && ok;
789                         if (ok)
790                                 break;
791                 }
792
793 retry:
794                 s = drbd_wait_for_connect(mdev);
795                 if (s) {
796                         try = drbd_recv_fp(mdev, s);
797                         drbd_socket_okay(mdev, &sock);
798                         drbd_socket_okay(mdev, &msock);
799                         switch (try) {
800                         case P_HAND_SHAKE_S:
801                                 if (sock) {
802                                         dev_warn(DEV, "initial packet S crossed\n");
803                                         sock_release(sock);
804                                 }
805                                 sock = s;
806                                 break;
807                         case P_HAND_SHAKE_M:
808                                 if (msock) {
809                                         dev_warn(DEV, "initial packet M crossed\n");
810                                         sock_release(msock);
811                                 }
812                                 msock = s;
813                                 drbd_set_flag(mdev, DISCARD_CONCURRENT);
814                                 break;
815                         default:
816                                 dev_warn(DEV, "Error receiving initial packet\n");
817                                 sock_release(s);
818                                 if (random32() & 1)
819                                         goto retry;
820                         }
821                 }
822
823                 if (mdev->state.conn <= C_DISCONNECTING)
824                         goto out_release_sockets;
825                 if (signal_pending(current)) {
826                         flush_signals(current);
827                         smp_rmb();
828                         if (get_t_state(&mdev->receiver) == Exiting)
829                                 goto out_release_sockets;
830                 }
831
832                 if (sock && msock) {
833                         ok = drbd_socket_okay(mdev, &sock);
834                         ok = drbd_socket_okay(mdev, &msock) && ok;
835                         if (ok)
836                                 break;
837                 }
838         } while (1);
839
840         msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
841         sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
842
843         sock->sk->sk_allocation = GFP_NOIO;
844         msock->sk->sk_allocation = GFP_NOIO;
845
846         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
847         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
848
849         /* NOT YET ...
850          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
851          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
852          * first set it to the P_HAND_SHAKE timeout,
853          * which we set to 4x the configured ping_timeout. */
854         sock->sk->sk_sndtimeo =
855         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
856
857         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
858         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
859
860         /* we don't want delays.
861          * we use TCP_CORK where appropriate, though */
862         drbd_tcp_nodelay(sock);
863         drbd_tcp_nodelay(msock);
864
865         mdev->data.socket = sock;
866         mdev->meta.socket = msock;
867         mdev->last_received = jiffies;
868
869         D_ASSERT(mdev->asender.task == NULL);
870
871         h = drbd_do_handshake(mdev);
872         if (h <= 0)
873                 return h;
874
875         if (mdev->cram_hmac_tfm) {
876                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
877                 switch (drbd_do_auth(mdev)) {
878                 case -1:
879                         dev_err(DEV, "Authentication of peer failed\n");
880                         return -1;
881                 case 0:
882                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
883                         return 0;
884                 }
885         }
886
887         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
888         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
889
890         atomic_set(&mdev->packet_seq, 0);
891         mdev->peer_seq = 0;
892
893         if (drbd_send_protocol(mdev) == -1)
894                 return -1;
895         drbd_set_flag(mdev, STATE_SENT);
896         drbd_send_sync_param(mdev, &mdev->sync_conf);
897         drbd_send_sizes(mdev, 0, 0);
898         drbd_send_uuids(mdev);
899         drbd_send_current_state(mdev);
900         drbd_clear_flag(mdev, USE_DEGR_WFC_T);
901         drbd_clear_flag(mdev, RESIZE_PENDING);
902
903         spin_lock_irq(&mdev->req_lock);
904         rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL);
905         if (mdev->state.conn != C_WF_REPORT_PARAMS)
906                 drbd_clear_flag(mdev, STATE_SENT);
907         spin_unlock_irq(&mdev->req_lock);
908
909         if (rv < SS_SUCCESS)
910                 return 0;
911
912         drbd_thread_start(&mdev->asender);
913         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
914
915         return 1;
916
917 out_release_sockets:
918         if (sock)
919                 sock_release(sock);
920         if (msock)
921                 sock_release(msock);
922         return -1;
923 }
924
925 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
926 {
927         union p_header *h = &mdev->data.rbuf.header;
928         int r;
929
930         r = drbd_recv(mdev, h, sizeof(*h));
931         if (unlikely(r != sizeof(*h))) {
932                 if (!signal_pending(current))
933                         dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
934                 return false;
935         }
936
937         if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
938                 *cmd = be16_to_cpu(h->h80.command);
939                 *packet_size = be16_to_cpu(h->h80.length);
940         } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
941                 *cmd = be16_to_cpu(h->h95.command);
942                 *packet_size = be32_to_cpu(h->h95.length);
943         } else {
944                 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
945                     be32_to_cpu(h->h80.magic),
946                     be16_to_cpu(h->h80.command),
947                     be16_to_cpu(h->h80.length));
948                 return false;
949         }
950         mdev->last_received = jiffies;
951
952         return true;
953 }
954
955 static void drbd_flush(struct drbd_conf *mdev)
956 {
957         int rv;
958
959         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
960                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_NOIO,
961                                         NULL);
962                 if (rv) {
963                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
964                         /* would rather check on EOPNOTSUPP, but that is not reliable.
965                          * don't try again for ANY return value != 0
966                          * if (rv == -EOPNOTSUPP) */
967                         drbd_bump_write_ordering(mdev, WO_drain_io);
968                 }
969                 put_ldev(mdev);
970         }
971 }
972
973 /**
974  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
975  * @mdev:       DRBD device.
976  * @epoch:      Epoch object.
977  * @ev:         Epoch event.
978  */
979 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
980                                                struct drbd_epoch *epoch,
981                                                enum epoch_event ev)
982 {
983         int epoch_size;
984         struct drbd_epoch *next_epoch;
985         enum finish_epoch rv = FE_STILL_LIVE;
986
987         spin_lock(&mdev->epoch_lock);
988         do {
989                 next_epoch = NULL;
990
991                 epoch_size = atomic_read(&epoch->epoch_size);
992
993                 switch (ev & ~EV_CLEANUP) {
994                 case EV_PUT:
995                         atomic_dec(&epoch->active);
996                         break;
997                 case EV_GOT_BARRIER_NR:
998                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
999                         break;
1000                 case EV_BECAME_LAST:
1001                         /* nothing to do*/
1002                         break;
1003                 }
1004
1005                 if (epoch_size != 0 &&
1006                     atomic_read(&epoch->active) == 0 &&
1007                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1008                         if (!(ev & EV_CLEANUP)) {
1009                                 spin_unlock(&mdev->epoch_lock);
1010                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1011                                 spin_lock(&mdev->epoch_lock);
1012                         }
1013                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1014                                 dec_unacked(mdev);
1015
1016                         if (mdev->current_epoch != epoch) {
1017                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1018                                 list_del(&epoch->list);
1019                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1020                                 mdev->epochs--;
1021                                 kfree(epoch);
1022
1023                                 if (rv == FE_STILL_LIVE)
1024                                         rv = FE_DESTROYED;
1025                         } else {
1026                                 epoch->flags = 0;
1027                                 atomic_set(&epoch->epoch_size, 0);
1028                                 /* atomic_set(&epoch->active, 0); is already zero */
1029                                 if (rv == FE_STILL_LIVE)
1030                                         rv = FE_RECYCLED;
1031                                 wake_up(&mdev->ee_wait);
1032                         }
1033                 }
1034
1035                 if (!next_epoch)
1036                         break;
1037
1038                 epoch = next_epoch;
1039         } while (1);
1040
1041         spin_unlock(&mdev->epoch_lock);
1042
1043         return rv;
1044 }
1045
1046 /**
1047  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1048  * @mdev:       DRBD device.
1049  * @wo:         Write ordering method to try.
1050  */
1051 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1052 {
1053         enum write_ordering_e pwo;
1054         static char *write_ordering_str[] = {
1055                 [WO_none] = "none",
1056                 [WO_drain_io] = "drain",
1057                 [WO_bdev_flush] = "flush",
1058         };
1059
1060         pwo = mdev->write_ordering;
1061         wo = min(pwo, wo);
1062         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1063                 wo = WO_drain_io;
1064         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1065                 wo = WO_none;
1066         mdev->write_ordering = wo;
1067         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1068                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1069 }
1070
1071 /**
1072  * drbd_submit_ee()
1073  * @mdev:       DRBD device.
1074  * @e:          epoch entry
1075  * @rw:         flag field, see bio->bi_rw
1076  *
1077  * May spread the pages to multiple bios,
1078  * depending on bio_add_page restrictions.
1079  *
1080  * Returns 0 if all bios have been submitted,
1081  * -ENOMEM if we could not allocate enough bios,
1082  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1083  *  single page to an empty bio (which should never happen and likely indicates
1084  *  that the lower level IO stack is in some way broken). This has been observed
1085  *  on certain Xen deployments.
1086  */
1087 /* TODO allocate from our own bio_set. */
1088 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1089                 const unsigned rw, const int fault_type)
1090 {
1091         struct bio *bios = NULL;
1092         struct bio *bio;
1093         struct page *page = e->pages;
1094         sector_t sector = e->sector;
1095         unsigned ds = e->size;
1096         unsigned n_bios = 0;
1097         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1098         int err = -ENOMEM;
1099
1100         /* In most cases, we will only need one bio.  But in case the lower
1101          * level restrictions happen to be different at this offset on this
1102          * side than those of the sending peer, we may need to submit the
1103          * request in more than one bio.
1104          *
1105          * Plain bio_alloc is good enough here, this is no DRBD internally
1106          * generated bio, but a bio allocated on behalf of the peer.
1107          */
1108 next_bio:
1109         bio = bio_alloc(GFP_NOIO, nr_pages);
1110         if (!bio) {
1111                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1112                 goto fail;
1113         }
1114         /* > e->sector, unless this is the first bio */
1115         bio->bi_sector = sector;
1116         bio->bi_bdev = mdev->ldev->backing_bdev;
1117         bio->bi_rw = rw;
1118         bio->bi_private = e;
1119         bio->bi_end_io = drbd_endio_sec;
1120
1121         bio->bi_next = bios;
1122         bios = bio;
1123         ++n_bios;
1124
1125         page_chain_for_each(page) {
1126                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1127                 if (!bio_add_page(bio, page, len, 0)) {
1128                         /* A single page must always be possible!
1129                          * But in case it fails anyways,
1130                          * we deal with it, and complain (below). */
1131                         if (bio->bi_vcnt == 0) {
1132                                 dev_err(DEV,
1133                                         "bio_add_page failed for len=%u, "
1134                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1135                                         len, (unsigned long long)bio->bi_sector);
1136                                 err = -ENOSPC;
1137                                 goto fail;
1138                         }
1139                         goto next_bio;
1140                 }
1141                 ds -= len;
1142                 sector += len >> 9;
1143                 --nr_pages;
1144         }
1145         D_ASSERT(page == NULL);
1146         D_ASSERT(ds == 0);
1147
1148         atomic_set(&e->pending_bios, n_bios);
1149         do {
1150                 bio = bios;
1151                 bios = bios->bi_next;
1152                 bio->bi_next = NULL;
1153
1154                 drbd_generic_make_request(mdev, fault_type, bio);
1155         } while (bios);
1156         return 0;
1157
1158 fail:
1159         while (bios) {
1160                 bio = bios;
1161                 bios = bios->bi_next;
1162                 bio_put(bio);
1163         }
1164         return err;
1165 }
1166
1167 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1168 {
1169         int rv;
1170         struct p_barrier *p = &mdev->data.rbuf.barrier;
1171         struct drbd_epoch *epoch;
1172
1173         inc_unacked(mdev);
1174
1175         mdev->current_epoch->barrier_nr = p->barrier;
1176         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1177
1178         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1179          * the activity log, which means it would not be resynced in case the
1180          * R_PRIMARY crashes now.
1181          * Therefore we must send the barrier_ack after the barrier request was
1182          * completed. */
1183         switch (mdev->write_ordering) {
1184         case WO_none:
1185                 if (rv == FE_RECYCLED)
1186                         return true;
1187
1188                 /* receiver context, in the writeout path of the other node.
1189                  * avoid potential distributed deadlock */
1190                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1191                 if (epoch)
1192                         break;
1193                 else
1194                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1195                         /* Fall through */
1196
1197         case WO_bdev_flush:
1198         case WO_drain_io:
1199                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1200                 drbd_flush(mdev);
1201
1202                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1203                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1204                         if (epoch)
1205                                 break;
1206                 }
1207
1208                 epoch = mdev->current_epoch;
1209                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1210
1211                 D_ASSERT(atomic_read(&epoch->active) == 0);
1212                 D_ASSERT(epoch->flags == 0);
1213
1214                 return true;
1215         default:
1216                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1217                 return false;
1218         }
1219
1220         epoch->flags = 0;
1221         atomic_set(&epoch->epoch_size, 0);
1222         atomic_set(&epoch->active, 0);
1223
1224         spin_lock(&mdev->epoch_lock);
1225         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1226                 list_add(&epoch->list, &mdev->current_epoch->list);
1227                 mdev->current_epoch = epoch;
1228                 mdev->epochs++;
1229         } else {
1230                 /* The current_epoch got recycled while we allocated this one... */
1231                 kfree(epoch);
1232         }
1233         spin_unlock(&mdev->epoch_lock);
1234
1235         return true;
1236 }
1237
1238 /* used from receive_RSDataReply (recv_resync_read)
1239  * and from receive_Data */
1240 static struct drbd_epoch_entry *
1241 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1242 {
1243         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1244         struct drbd_epoch_entry *e;
1245         struct page *page;
1246         int dgs, ds, rr;
1247         void *dig_in = mdev->int_dig_in;
1248         void *dig_vv = mdev->int_dig_vv;
1249         unsigned long *data;
1250
1251         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1252                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1253
1254         if (dgs) {
1255                 rr = drbd_recv(mdev, dig_in, dgs);
1256                 if (rr != dgs) {
1257                         if (!signal_pending(current))
1258                                 dev_warn(DEV,
1259                                         "short read receiving data digest: read %d expected %d\n",
1260                                         rr, dgs);
1261                         return NULL;
1262                 }
1263         }
1264
1265         data_size -= dgs;
1266
1267         ERR_IF(data_size &  0x1ff) return NULL;
1268         ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1269
1270         /* even though we trust out peer,
1271          * we sometimes have to double check. */
1272         if (sector + (data_size>>9) > capacity) {
1273                 dev_err(DEV, "request from peer beyond end of local disk: "
1274                         "capacity: %llus < sector: %llus + size: %u\n",
1275                         (unsigned long long)capacity,
1276                         (unsigned long long)sector, data_size);
1277                 return NULL;
1278         }
1279
1280         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1281          * "criss-cross" setup, that might cause write-out on some other DRBD,
1282          * which in turn might block on the other node at this very place.  */
1283         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1284         if (!e)
1285                 return NULL;
1286
1287         if (!data_size)
1288                 return e;
1289
1290         ds = data_size;
1291         page = e->pages;
1292         page_chain_for_each(page) {
1293                 unsigned len = min_t(int, ds, PAGE_SIZE);
1294                 data = kmap(page);
1295                 rr = drbd_recv(mdev, data, len);
1296                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1297                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1298                         data[0] = data[0] ^ (unsigned long)-1;
1299                 }
1300                 kunmap(page);
1301                 if (rr != len) {
1302                         drbd_free_ee(mdev, e);
1303                         if (!signal_pending(current))
1304                                 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1305                                 rr, len);
1306                         return NULL;
1307                 }
1308                 ds -= rr;
1309         }
1310
1311         if (dgs) {
1312                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1313                 if (memcmp(dig_in, dig_vv, dgs)) {
1314                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1315                                 (unsigned long long)sector, data_size);
1316                         drbd_bcast_ee(mdev, "digest failed",
1317                                         dgs, dig_in, dig_vv, e);
1318                         drbd_free_ee(mdev, e);
1319                         return NULL;
1320                 }
1321         }
1322         mdev->recv_cnt += data_size>>9;
1323         return e;
1324 }
1325
1326 /* drbd_drain_block() just takes a data block
1327  * out of the socket input buffer, and discards it.
1328  */
1329 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1330 {
1331         struct page *page;
1332         int rr, rv = 1;
1333         void *data;
1334
1335         if (!data_size)
1336                 return true;
1337
1338         page = drbd_pp_alloc(mdev, 1, 1);
1339
1340         data = kmap(page);
1341         while (data_size) {
1342                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1343                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1344                         rv = 0;
1345                         if (!signal_pending(current))
1346                                 dev_warn(DEV,
1347                                         "short read receiving data: read %d expected %d\n",
1348                                         rr, min_t(int, data_size, PAGE_SIZE));
1349                         break;
1350                 }
1351                 data_size -= rr;
1352         }
1353         kunmap(page);
1354         drbd_pp_free(mdev, page, 0);
1355         return rv;
1356 }
1357
1358 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1359                            sector_t sector, int data_size)
1360 {
1361         struct bio_vec *bvec;
1362         struct bio *bio;
1363         int dgs, rr, i, expect;
1364         void *dig_in = mdev->int_dig_in;
1365         void *dig_vv = mdev->int_dig_vv;
1366
1367         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1368                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1369
1370         if (dgs) {
1371                 rr = drbd_recv(mdev, dig_in, dgs);
1372                 if (rr != dgs) {
1373                         if (!signal_pending(current))
1374                                 dev_warn(DEV,
1375                                         "short read receiving data reply digest: read %d expected %d\n",
1376                                         rr, dgs);
1377                         return 0;
1378                 }
1379         }
1380
1381         data_size -= dgs;
1382
1383         /* optimistically update recv_cnt.  if receiving fails below,
1384          * we disconnect anyways, and counters will be reset. */
1385         mdev->recv_cnt += data_size>>9;
1386
1387         bio = req->master_bio;
1388         D_ASSERT(sector == bio->bi_sector);
1389
1390         bio_for_each_segment(bvec, bio, i) {
1391                 expect = min_t(int, data_size, bvec->bv_len);
1392                 rr = drbd_recv(mdev,
1393                              kmap(bvec->bv_page)+bvec->bv_offset,
1394                              expect);
1395                 kunmap(bvec->bv_page);
1396                 if (rr != expect) {
1397                         if (!signal_pending(current))
1398                                 dev_warn(DEV, "short read receiving data reply: "
1399                                         "read %d expected %d\n",
1400                                         rr, expect);
1401                         return 0;
1402                 }
1403                 data_size -= rr;
1404         }
1405
1406         if (dgs) {
1407                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1408                 if (memcmp(dig_in, dig_vv, dgs)) {
1409                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1410                         return 0;
1411                 }
1412         }
1413
1414         D_ASSERT(data_size == 0);
1415         return 1;
1416 }
1417
1418 /* e_end_resync_block() is called via
1419  * drbd_process_done_ee() by asender only */
1420 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1421 {
1422         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1423         sector_t sector = e->sector;
1424         int ok;
1425
1426         D_ASSERT(hlist_unhashed(&e->collision));
1427
1428         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1429                 drbd_set_in_sync(mdev, sector, e->size);
1430                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1431         } else {
1432                 /* Record failure to sync */
1433                 drbd_rs_failed_io(mdev, sector, e->size);
1434
1435                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1436         }
1437         dec_unacked(mdev);
1438
1439         return ok;
1440 }
1441
1442 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1443 {
1444         struct drbd_epoch_entry *e;
1445
1446         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1447         if (!e)
1448                 goto fail;
1449
1450         dec_rs_pending(mdev);
1451
1452         inc_unacked(mdev);
1453         /* corresponding dec_unacked() in e_end_resync_block()
1454          * respective _drbd_clear_done_ee */
1455
1456         e->w.cb = e_end_resync_block;
1457
1458         spin_lock_irq(&mdev->req_lock);
1459         list_add(&e->w.list, &mdev->sync_ee);
1460         spin_unlock_irq(&mdev->req_lock);
1461
1462         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1463         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1464                 return true;
1465
1466         /* don't care for the reason here */
1467         dev_err(DEV, "submit failed, triggering re-connect\n");
1468         spin_lock_irq(&mdev->req_lock);
1469         list_del(&e->w.list);
1470         spin_unlock_irq(&mdev->req_lock);
1471
1472         drbd_free_ee(mdev, e);
1473 fail:
1474         put_ldev(mdev);
1475         return false;
1476 }
1477
1478 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1479 {
1480         struct drbd_request *req;
1481         sector_t sector;
1482         int ok;
1483         struct p_data *p = &mdev->data.rbuf.data;
1484
1485         sector = be64_to_cpu(p->sector);
1486
1487         spin_lock_irq(&mdev->req_lock);
1488         req = _ar_id_to_req(mdev, p->block_id, sector);
1489         spin_unlock_irq(&mdev->req_lock);
1490         if (unlikely(!req)) {
1491                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1492                 return false;
1493         }
1494
1495         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1496          * special casing it there for the various failure cases.
1497          * still no race with drbd_fail_pending_reads */
1498         ok = recv_dless_read(mdev, req, sector, data_size);
1499
1500         if (ok)
1501                 req_mod(req, data_received);
1502         /* else: nothing. handled from drbd_disconnect...
1503          * I don't think we may complete this just yet
1504          * in case we are "on-disconnect: freeze" */
1505
1506         return ok;
1507 }
1508
1509 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1510 {
1511         sector_t sector;
1512         int ok;
1513         struct p_data *p = &mdev->data.rbuf.data;
1514
1515         sector = be64_to_cpu(p->sector);
1516         D_ASSERT(p->block_id == ID_SYNCER);
1517
1518         if (get_ldev(mdev)) {
1519                 /* data is submitted to disk within recv_resync_read.
1520                  * corresponding put_ldev done below on error,
1521                  * or in drbd_endio_write_sec. */
1522                 ok = recv_resync_read(mdev, sector, data_size);
1523         } else {
1524                 if (__ratelimit(&drbd_ratelimit_state))
1525                         dev_err(DEV, "Can not write resync data to local disk.\n");
1526
1527                 ok = drbd_drain_block(mdev, data_size);
1528
1529                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1530         }
1531
1532         atomic_add(data_size >> 9, &mdev->rs_sect_in);
1533
1534         return ok;
1535 }
1536
1537 /* e_end_block() is called via drbd_process_done_ee().
1538  * this means this function only runs in the asender thread
1539  */
1540 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1541 {
1542         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1543         sector_t sector = e->sector;
1544         int ok = 1, pcmd;
1545
1546         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1547                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1548                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1549                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1550                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1551                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1552                         ok &= drbd_send_ack(mdev, pcmd, e);
1553                         if (pcmd == P_RS_WRITE_ACK)
1554                                 drbd_set_in_sync(mdev, sector, e->size);
1555                 } else {
1556                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1557                         /* we expect it to be marked out of sync anyways...
1558                          * maybe assert this?  */
1559                 }
1560                 dec_unacked(mdev);
1561         }
1562         /* we delete from the conflict detection hash _after_ we sent out the
1563          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1564         if (mdev->net_conf->two_primaries) {
1565                 spin_lock_irq(&mdev->req_lock);
1566                 D_ASSERT(!hlist_unhashed(&e->collision));
1567                 hlist_del_init(&e->collision);
1568                 spin_unlock_irq(&mdev->req_lock);
1569         } else {
1570                 D_ASSERT(hlist_unhashed(&e->collision));
1571         }
1572
1573         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1574
1575         return ok;
1576 }
1577
1578 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1579 {
1580         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1581         int ok = 1;
1582
1583         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1584         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1585
1586         spin_lock_irq(&mdev->req_lock);
1587         D_ASSERT(!hlist_unhashed(&e->collision));
1588         hlist_del_init(&e->collision);
1589         spin_unlock_irq(&mdev->req_lock);
1590
1591         dec_unacked(mdev);
1592
1593         return ok;
1594 }
1595
1596 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e)
1597 {
1598
1599         struct drbd_epoch_entry *rs_e;
1600         bool rv = 0;
1601
1602         spin_lock_irq(&mdev->req_lock);
1603         list_for_each_entry(rs_e, &mdev->sync_ee, w.list) {
1604                 if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) {
1605                         rv = 1;
1606                         break;
1607                 }
1608         }
1609         spin_unlock_irq(&mdev->req_lock);
1610
1611         return rv;
1612 }
1613
1614 /* Called from receive_Data.
1615  * Synchronize packets on sock with packets on msock.
1616  *
1617  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1618  * packet traveling on msock, they are still processed in the order they have
1619  * been sent.
1620  *
1621  * Note: we don't care for Ack packets overtaking P_DATA packets.
1622  *
1623  * In case packet_seq is larger than mdev->peer_seq number, there are
1624  * outstanding packets on the msock. We wait for them to arrive.
1625  * In case we are the logically next packet, we update mdev->peer_seq
1626  * ourselves. Correctly handles 32bit wrap around.
1627  *
1628  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1629  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1630  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1631  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1632  *
1633  * returns 0 if we may process the packet,
1634  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1635 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1636 {
1637         DEFINE_WAIT(wait);
1638         unsigned int p_seq;
1639         long timeout;
1640         int ret = 0;
1641         spin_lock(&mdev->peer_seq_lock);
1642         for (;;) {
1643                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1644                 if (seq_le(packet_seq, mdev->peer_seq+1))
1645                         break;
1646                 if (signal_pending(current)) {
1647                         ret = -ERESTARTSYS;
1648                         break;
1649                 }
1650                 p_seq = mdev->peer_seq;
1651                 spin_unlock(&mdev->peer_seq_lock);
1652                 timeout = schedule_timeout(30*HZ);
1653                 spin_lock(&mdev->peer_seq_lock);
1654                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1655                         ret = -ETIMEDOUT;
1656                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1657                         break;
1658                 }
1659         }
1660         finish_wait(&mdev->seq_wait, &wait);
1661         if (mdev->peer_seq+1 == packet_seq)
1662                 mdev->peer_seq++;
1663         spin_unlock(&mdev->peer_seq_lock);
1664         return ret;
1665 }
1666
1667 /* see also bio_flags_to_wire()
1668  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1669  * flags and back. We may replicate to other kernel versions. */
1670 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1671 {
1672         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1673                 (dpf & DP_FUA ? REQ_FUA : 0) |
1674                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1675                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1676 }
1677
1678 /* mirrored write */
1679 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1680 {
1681         sector_t sector;
1682         struct drbd_epoch_entry *e;
1683         struct p_data *p = &mdev->data.rbuf.data;
1684         int rw = WRITE;
1685         u32 dp_flags;
1686
1687         if (!get_ldev(mdev)) {
1688                 spin_lock(&mdev->peer_seq_lock);
1689                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1690                         mdev->peer_seq++;
1691                 spin_unlock(&mdev->peer_seq_lock);
1692
1693                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1694                 atomic_inc(&mdev->current_epoch->epoch_size);
1695                 return drbd_drain_block(mdev, data_size);
1696         }
1697
1698         /* get_ldev(mdev) successful.
1699          * Corresponding put_ldev done either below (on various errors),
1700          * or in drbd_endio_write_sec, if we successfully submit the data at
1701          * the end of this function. */
1702
1703         sector = be64_to_cpu(p->sector);
1704         e = read_in_block(mdev, p->block_id, sector, data_size);
1705         if (!e) {
1706                 put_ldev(mdev);
1707                 return false;
1708         }
1709
1710         e->w.cb = e_end_block;
1711
1712         dp_flags = be32_to_cpu(p->dp_flags);
1713         rw |= wire_flags_to_bio(mdev, dp_flags);
1714         if (e->pages == NULL) {
1715                 D_ASSERT(e->size == 0);
1716                 D_ASSERT(dp_flags & DP_FLUSH);
1717         }
1718
1719         if (dp_flags & DP_MAY_SET_IN_SYNC)
1720                 e->flags |= EE_MAY_SET_IN_SYNC;
1721
1722         spin_lock(&mdev->epoch_lock);
1723         e->epoch = mdev->current_epoch;
1724         atomic_inc(&e->epoch->epoch_size);
1725         atomic_inc(&e->epoch->active);
1726         spin_unlock(&mdev->epoch_lock);
1727
1728         /* I'm the receiver, I do hold a net_cnt reference. */
1729         if (!mdev->net_conf->two_primaries) {
1730                 spin_lock_irq(&mdev->req_lock);
1731         } else {
1732                 /* don't get the req_lock yet,
1733                  * we may sleep in drbd_wait_peer_seq */
1734                 const int size = e->size;
1735                 const int discard = drbd_test_flag(mdev, DISCARD_CONCURRENT);
1736                 DEFINE_WAIT(wait);
1737                 struct drbd_request *i;
1738                 struct hlist_node *n;
1739                 struct hlist_head *slot;
1740                 int first;
1741
1742                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1743                 BUG_ON(mdev->ee_hash == NULL);
1744                 BUG_ON(mdev->tl_hash == NULL);
1745
1746                 /* conflict detection and handling:
1747                  * 1. wait on the sequence number,
1748                  *    in case this data packet overtook ACK packets.
1749                  * 2. check our hash tables for conflicting requests.
1750                  *    we only need to walk the tl_hash, since an ee can not
1751                  *    have a conflict with an other ee: on the submitting
1752                  *    node, the corresponding req had already been conflicting,
1753                  *    and a conflicting req is never sent.
1754                  *
1755                  * Note: for two_primaries, we are protocol C,
1756                  * so there cannot be any request that is DONE
1757                  * but still on the transfer log.
1758                  *
1759                  * unconditionally add to the ee_hash.
1760                  *
1761                  * if no conflicting request is found:
1762                  *    submit.
1763                  *
1764                  * if any conflicting request is found
1765                  * that has not yet been acked,
1766                  * AND I have the "discard concurrent writes" flag:
1767                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1768                  *
1769                  * if any conflicting request is found:
1770                  *       block the receiver, waiting on misc_wait
1771                  *       until no more conflicting requests are there,
1772                  *       or we get interrupted (disconnect).
1773                  *
1774                  *       we do not just write after local io completion of those
1775                  *       requests, but only after req is done completely, i.e.
1776                  *       we wait for the P_DISCARD_ACK to arrive!
1777                  *
1778                  *       then proceed normally, i.e. submit.
1779                  */
1780                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1781                         goto out_interrupted;
1782
1783                 spin_lock_irq(&mdev->req_lock);
1784
1785                 hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1786
1787 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1788                 slot = tl_hash_slot(mdev, sector);
1789                 first = 1;
1790                 for (;;) {
1791                         int have_unacked = 0;
1792                         int have_conflict = 0;
1793                         prepare_to_wait(&mdev->misc_wait, &wait,
1794                                 TASK_INTERRUPTIBLE);
1795                         hlist_for_each_entry(i, n, slot, collision) {
1796                                 if (OVERLAPS) {
1797                                         /* only ALERT on first iteration,
1798                                          * we may be woken up early... */
1799                                         if (first)
1800                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1801                                                       " new: %llus +%u; pending: %llus +%u\n",
1802                                                       current->comm, current->pid,
1803                                                       (unsigned long long)sector, size,
1804                                                       (unsigned long long)i->sector, i->size);
1805                                         if (i->rq_state & RQ_NET_PENDING)
1806                                                 ++have_unacked;
1807                                         ++have_conflict;
1808                                 }
1809                         }
1810 #undef OVERLAPS
1811                         if (!have_conflict)
1812                                 break;
1813
1814                         /* Discard Ack only for the _first_ iteration */
1815                         if (first && discard && have_unacked) {
1816                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1817                                      (unsigned long long)sector);
1818                                 inc_unacked(mdev);
1819                                 e->w.cb = e_send_discard_ack;
1820                                 list_add_tail(&e->w.list, &mdev->done_ee);
1821
1822                                 spin_unlock_irq(&mdev->req_lock);
1823
1824                                 /* we could probably send that P_DISCARD_ACK ourselves,
1825                                  * but I don't like the receiver using the msock */
1826
1827                                 put_ldev(mdev);
1828                                 wake_asender(mdev);
1829                                 finish_wait(&mdev->misc_wait, &wait);
1830                                 return true;
1831                         }
1832
1833                         if (signal_pending(current)) {
1834                                 hlist_del_init(&e->collision);
1835
1836                                 spin_unlock_irq(&mdev->req_lock);
1837
1838                                 finish_wait(&mdev->misc_wait, &wait);
1839                                 goto out_interrupted;
1840                         }
1841
1842                         spin_unlock_irq(&mdev->req_lock);
1843                         if (first) {
1844                                 first = 0;
1845                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1846                                      "sec=%llus\n", (unsigned long long)sector);
1847                         } else if (discard) {
1848                                 /* we had none on the first iteration.
1849                                  * there must be none now. */
1850                                 D_ASSERT(have_unacked == 0);
1851                         }
1852                         schedule();
1853                         spin_lock_irq(&mdev->req_lock);
1854                 }
1855                 finish_wait(&mdev->misc_wait, &wait);
1856         }
1857
1858         list_add(&e->w.list, &mdev->active_ee);
1859         spin_unlock_irq(&mdev->req_lock);
1860
1861         if (mdev->state.conn == C_SYNC_TARGET)
1862                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e));
1863
1864         switch (mdev->net_conf->wire_protocol) {
1865         case DRBD_PROT_C:
1866                 inc_unacked(mdev);
1867                 /* corresponding dec_unacked() in e_end_block()
1868                  * respective _drbd_clear_done_ee */
1869                 break;
1870         case DRBD_PROT_B:
1871                 /* I really don't like it that the receiver thread
1872                  * sends on the msock, but anyways */
1873                 drbd_send_ack(mdev, P_RECV_ACK, e);
1874                 break;
1875         case DRBD_PROT_A:
1876                 /* nothing to do */
1877                 break;
1878         }
1879
1880         if (mdev->state.pdsk < D_INCONSISTENT) {
1881                 /* In case we have the only disk of the cluster, */
1882                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1883                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1884                 e->flags &= ~EE_MAY_SET_IN_SYNC;
1885                 drbd_al_begin_io(mdev, e->sector);
1886         }
1887
1888         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1889                 return true;
1890
1891         /* don't care for the reason here */
1892         dev_err(DEV, "submit failed, triggering re-connect\n");
1893         spin_lock_irq(&mdev->req_lock);
1894         list_del(&e->w.list);
1895         hlist_del_init(&e->collision);
1896         spin_unlock_irq(&mdev->req_lock);
1897         if (e->flags & EE_CALL_AL_COMPLETE_IO)
1898                 drbd_al_complete_io(mdev, e->sector);
1899
1900 out_interrupted:
1901         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1902         put_ldev(mdev);
1903         drbd_free_ee(mdev, e);
1904         return false;
1905 }
1906
1907 /* We may throttle resync, if the lower device seems to be busy,
1908  * and current sync rate is above c_min_rate.
1909  *
1910  * To decide whether or not the lower device is busy, we use a scheme similar
1911  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1912  * (more than 64 sectors) of activity we cannot account for with our own resync
1913  * activity, it obviously is "busy".
1914  *
1915  * The current sync rate used here uses only the most recent two step marks,
1916  * to have a short time average so we can react faster.
1917  */
1918 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1919 {
1920         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1921         unsigned long db, dt, dbdt;
1922         struct lc_element *tmp;
1923         int curr_events;
1924         int throttle = 0;
1925
1926         /* feature disabled? */
1927         if (mdev->sync_conf.c_min_rate == 0)
1928                 return 0;
1929
1930         spin_lock_irq(&mdev->al_lock);
1931         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1932         if (tmp) {
1933                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1934                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1935                         spin_unlock_irq(&mdev->al_lock);
1936                         return 0;
1937                 }
1938                 /* Do not slow down if app IO is already waiting for this extent */
1939         }
1940         spin_unlock_irq(&mdev->al_lock);
1941
1942         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1943                       (int)part_stat_read(&disk->part0, sectors[1]) -
1944                         atomic_read(&mdev->rs_sect_ev);
1945
1946         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1947                 unsigned long rs_left;
1948                 int i;
1949
1950                 mdev->rs_last_events = curr_events;
1951
1952                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1953                  * approx. */
1954                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1955
1956                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1957                         rs_left = mdev->ov_left;
1958                 else
1959                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1960
1961                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1962                 if (!dt)
1963                         dt++;
1964                 db = mdev->rs_mark_left[i] - rs_left;
1965                 dbdt = Bit2KB(db/dt);
1966
1967                 if (dbdt > mdev->sync_conf.c_min_rate)
1968                         throttle = 1;
1969         }
1970         return throttle;
1971 }
1972
1973
1974 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1975 {
1976         sector_t sector;
1977         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1978         struct drbd_epoch_entry *e;
1979         struct digest_info *di = NULL;
1980         int size, verb;
1981         unsigned int fault_type;
1982         struct p_block_req *p = &mdev->data.rbuf.block_req;
1983
1984         sector = be64_to_cpu(p->sector);
1985         size   = be32_to_cpu(p->blksize);
1986
1987         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1988                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1989                                 (unsigned long long)sector, size);
1990                 return false;
1991         }
1992         if (sector + (size>>9) > capacity) {
1993                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1994                                 (unsigned long long)sector, size);
1995                 return false;
1996         }
1997
1998         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1999                 verb = 1;
2000                 switch (cmd) {
2001                 case P_DATA_REQUEST:
2002                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2003                         break;
2004                 case P_RS_DATA_REQUEST:
2005                 case P_CSUM_RS_REQUEST:
2006                 case P_OV_REQUEST:
2007                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2008                         break;
2009                 case P_OV_REPLY:
2010                         verb = 0;
2011                         dec_rs_pending(mdev);
2012                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2013                         break;
2014                 default:
2015                         dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2016                                 cmdname(cmd));
2017                 }
2018                 if (verb && __ratelimit(&drbd_ratelimit_state))
2019                         dev_err(DEV, "Can not satisfy peer's read request, "
2020                             "no local data.\n");
2021
2022                 /* drain possibly payload */
2023                 return drbd_drain_block(mdev, digest_size);
2024         }
2025
2026         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2027          * "criss-cross" setup, that might cause write-out on some other DRBD,
2028          * which in turn might block on the other node at this very place.  */
2029         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2030         if (!e) {
2031                 put_ldev(mdev);
2032                 return false;
2033         }
2034
2035         switch (cmd) {
2036         case P_DATA_REQUEST:
2037                 e->w.cb = w_e_end_data_req;
2038                 fault_type = DRBD_FAULT_DT_RD;
2039                 /* application IO, don't drbd_rs_begin_io */
2040                 goto submit;
2041
2042         case P_RS_DATA_REQUEST:
2043                 e->w.cb = w_e_end_rsdata_req;
2044                 fault_type = DRBD_FAULT_RS_RD;
2045                 /* used in the sector offset progress display */
2046                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2047                 break;
2048
2049         case P_OV_REPLY:
2050         case P_CSUM_RS_REQUEST:
2051                 fault_type = DRBD_FAULT_RS_RD;
2052                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2053                 if (!di)
2054                         goto out_free_e;
2055
2056                 di->digest_size = digest_size;
2057                 di->digest = (((char *)di)+sizeof(struct digest_info));
2058
2059                 e->digest = di;
2060                 e->flags |= EE_HAS_DIGEST;
2061
2062                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2063                         goto out_free_e;
2064
2065                 if (cmd == P_CSUM_RS_REQUEST) {
2066                         D_ASSERT(mdev->agreed_pro_version >= 89);
2067                         e->w.cb = w_e_end_csum_rs_req;
2068                         /* used in the sector offset progress display */
2069                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2070                 } else if (cmd == P_OV_REPLY) {
2071                         /* track progress, we may need to throttle */
2072                         atomic_add(size >> 9, &mdev->rs_sect_in);
2073                         e->w.cb = w_e_end_ov_reply;
2074                         dec_rs_pending(mdev);
2075                         /* drbd_rs_begin_io done when we sent this request,
2076                          * but accounting still needs to be done. */
2077                         goto submit_for_resync;
2078                 }
2079                 break;
2080
2081         case P_OV_REQUEST:
2082                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2083                     mdev->agreed_pro_version >= 90) {
2084                         unsigned long now = jiffies;
2085                         int i;
2086                         mdev->ov_start_sector = sector;
2087                         mdev->ov_position = sector;
2088                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2089                         mdev->rs_total = mdev->ov_left;
2090                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2091                                 mdev->rs_mark_left[i] = mdev->ov_left;
2092                                 mdev->rs_mark_time[i] = now;
2093                         }
2094                         dev_info(DEV, "Online Verify start sector: %llu\n",
2095                                         (unsigned long long)sector);
2096                 }
2097                 e->w.cb = w_e_end_ov_req;
2098                 fault_type = DRBD_FAULT_RS_RD;
2099                 break;
2100
2101         default:
2102                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2103                     cmdname(cmd));
2104                 fault_type = DRBD_FAULT_MAX;
2105                 goto out_free_e;
2106         }
2107
2108         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2109          * wrt the receiver, but it is not as straightforward as it may seem.
2110          * Various places in the resync start and stop logic assume resync
2111          * requests are processed in order, requeuing this on the worker thread
2112          * introduces a bunch of new code for synchronization between threads.
2113          *
2114          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2115          * "forever", throttling after drbd_rs_begin_io will lock that extent
2116          * for application writes for the same time.  For now, just throttle
2117          * here, where the rest of the code expects the receiver to sleep for
2118          * a while, anyways.
2119          */
2120
2121         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2122          * this defers syncer requests for some time, before letting at least
2123          * on request through.  The resync controller on the receiving side
2124          * will adapt to the incoming rate accordingly.
2125          *
2126          * We cannot throttle here if remote is Primary/SyncTarget:
2127          * we would also throttle its application reads.
2128          * In that case, throttling is done on the SyncTarget only.
2129          */
2130         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2131                 schedule_timeout_uninterruptible(HZ/10);
2132         if (drbd_rs_begin_io(mdev, sector))
2133                 goto out_free_e;
2134
2135 submit_for_resync:
2136         atomic_add(size >> 9, &mdev->rs_sect_ev);
2137
2138 submit:
2139         inc_unacked(mdev);
2140         spin_lock_irq(&mdev->req_lock);
2141         list_add_tail(&e->w.list, &mdev->read_ee);
2142         spin_unlock_irq(&mdev->req_lock);
2143
2144         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2145                 return true;
2146
2147         /* don't care for the reason here */
2148         dev_err(DEV, "submit failed, triggering re-connect\n");
2149         spin_lock_irq(&mdev->req_lock);
2150         list_del(&e->w.list);
2151         spin_unlock_irq(&mdev->req_lock);
2152         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2153
2154 out_free_e:
2155         put_ldev(mdev);
2156         drbd_free_ee(mdev, e);
2157         return false;
2158 }
2159
2160 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2161 {
2162         int self, peer, rv = -100;
2163         unsigned long ch_self, ch_peer;
2164
2165         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2166         peer = mdev->p_uuid[UI_BITMAP] & 1;
2167
2168         ch_peer = mdev->p_uuid[UI_SIZE];
2169         ch_self = mdev->comm_bm_set;
2170
2171         switch (mdev->net_conf->after_sb_0p) {
2172         case ASB_CONSENSUS:
2173         case ASB_DISCARD_SECONDARY:
2174         case ASB_CALL_HELPER:
2175                 dev_err(DEV, "Configuration error.\n");
2176                 break;
2177         case ASB_DISCONNECT:
2178                 break;
2179         case ASB_DISCARD_YOUNGER_PRI:
2180                 if (self == 0 && peer == 1) {
2181                         rv = -1;
2182                         break;
2183                 }
2184                 if (self == 1 && peer == 0) {
2185                         rv =  1;
2186                         break;
2187                 }
2188                 /* Else fall through to one of the other strategies... */
2189         case ASB_DISCARD_OLDER_PRI:
2190                 if (self == 0 && peer == 1) {
2191                         rv = 1;
2192                         break;
2193                 }
2194                 if (self == 1 && peer == 0) {
2195                         rv = -1;
2196                         break;
2197                 }
2198                 /* Else fall through to one of the other strategies... */
2199                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2200                      "Using discard-least-changes instead\n");
2201         case ASB_DISCARD_ZERO_CHG:
2202                 if (ch_peer == 0 && ch_self == 0) {
2203                         rv = drbd_test_flag(mdev, DISCARD_CONCURRENT)
2204                                 ? -1 : 1;
2205                         break;
2206                 } else {
2207                         if (ch_peer == 0) { rv =  1; break; }
2208                         if (ch_self == 0) { rv = -1; break; }
2209                 }
2210                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2211                         break;
2212         case ASB_DISCARD_LEAST_CHG:
2213                 if      (ch_self < ch_peer)
2214                         rv = -1;
2215                 else if (ch_self > ch_peer)
2216                         rv =  1;
2217                 else /* ( ch_self == ch_peer ) */
2218                      /* Well, then use something else. */
2219                         rv = drbd_test_flag(mdev, DISCARD_CONCURRENT)
2220                                 ? -1 : 1;
2221                 break;
2222         case ASB_DISCARD_LOCAL:
2223                 rv = -1;
2224                 break;
2225         case ASB_DISCARD_REMOTE:
2226                 rv =  1;
2227         }
2228
2229         return rv;
2230 }
2231
2232 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2233 {
2234         int hg, rv = -100;
2235
2236         switch (mdev->net_conf->after_sb_1p) {
2237         case ASB_DISCARD_YOUNGER_PRI:
2238         case ASB_DISCARD_OLDER_PRI:
2239         case ASB_DISCARD_LEAST_CHG:
2240         case ASB_DISCARD_LOCAL:
2241         case ASB_DISCARD_REMOTE:
2242                 dev_err(DEV, "Configuration error.\n");
2243                 break;
2244         case ASB_DISCONNECT:
2245                 break;
2246         case ASB_CONSENSUS:
2247                 hg = drbd_asb_recover_0p(mdev);
2248                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2249                         rv = hg;
2250                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2251                         rv = hg;
2252                 break;
2253         case ASB_VIOLENTLY:
2254                 rv = drbd_asb_recover_0p(mdev);
2255                 break;
2256         case ASB_DISCARD_SECONDARY:
2257                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2258         case ASB_CALL_HELPER:
2259                 hg = drbd_asb_recover_0p(mdev);
2260                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2261                         enum drbd_state_rv rv2;
2262
2263                         drbd_set_role(mdev, R_SECONDARY, 0);
2264                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2265                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2266                           * we do not need to wait for the after state change work either. */
2267                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2268                         if (rv2 != SS_SUCCESS) {
2269                                 drbd_khelper(mdev, "pri-lost-after-sb");
2270                         } else {
2271                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2272                                 rv = hg;
2273                         }
2274                 } else
2275                         rv = hg;
2276         }
2277
2278         return rv;
2279 }
2280
2281 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2282 {
2283         int hg, rv = -100;
2284
2285         switch (mdev->net_conf->after_sb_2p) {
2286         case ASB_DISCARD_YOUNGER_PRI:
2287         case ASB_DISCARD_OLDER_PRI:
2288         case ASB_DISCARD_LEAST_CHG:
2289         case ASB_DISCARD_LOCAL:
2290         case ASB_DISCARD_REMOTE:
2291         case ASB_CONSENSUS:
2292         case ASB_DISCARD_SECONDARY:
2293                 dev_err(DEV, "Configuration error.\n");
2294                 break;
2295         case ASB_VIOLENTLY:
2296                 rv = drbd_asb_recover_0p(mdev);
2297                 break;
2298         case ASB_DISCONNECT:
2299                 break;
2300         case ASB_CALL_HELPER:
2301                 hg = drbd_asb_recover_0p(mdev);
2302                 if (hg == -1) {
2303                         enum drbd_state_rv rv2;
2304
2305                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2306                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2307                           * we do not need to wait for the after state change work either. */
2308                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2309                         if (rv2 != SS_SUCCESS) {
2310                                 drbd_khelper(mdev, "pri-lost-after-sb");
2311                         } else {
2312                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2313                                 rv = hg;
2314                         }
2315                 } else
2316                         rv = hg;
2317         }
2318
2319         return rv;
2320 }
2321
2322 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2323                            u64 bits, u64 flags)
2324 {
2325         if (!uuid) {
2326                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2327                 return;
2328         }
2329         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2330              text,
2331              (unsigned long long)uuid[UI_CURRENT],
2332              (unsigned long long)uuid[UI_BITMAP],
2333              (unsigned long long)uuid[UI_HISTORY_START],
2334              (unsigned long long)uuid[UI_HISTORY_END],
2335              (unsigned long long)bits,
2336              (unsigned long long)flags);
2337 }
2338
2339 /*
2340   100   after split brain try auto recover
2341     2   C_SYNC_SOURCE set BitMap
2342     1   C_SYNC_SOURCE use BitMap
2343     0   no Sync
2344    -1   C_SYNC_TARGET use BitMap
2345    -2   C_SYNC_TARGET set BitMap
2346  -100   after split brain, disconnect
2347 -1000   unrelated data
2348 -1091   requires proto 91
2349 -1096   requires proto 96
2350  */
2351 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2352 {
2353         u64 self, peer;
2354         int i, j;
2355
2356         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2357         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2358
2359         *rule_nr = 10;
2360         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2361                 return 0;
2362
2363         *rule_nr = 20;
2364         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2365              peer != UUID_JUST_CREATED)
2366                 return -2;
2367
2368         *rule_nr = 30;
2369         if (self != UUID_JUST_CREATED &&
2370             (peer == UUID_JUST_CREATED || peer == (u64)0))
2371                 return 2;
2372
2373         if (self == peer) {
2374                 int rct, dc; /* roles at crash time */
2375
2376                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2377
2378                         if (mdev->agreed_pro_version < 91)
2379                                 return -1091;
2380
2381                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2382                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2383                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2384                                 drbd_uuid_move_history(mdev);
2385                                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2386                                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2387
2388                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2389                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2390                                 *rule_nr = 34;
2391                         } else {
2392                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2393                                 *rule_nr = 36;
2394                         }
2395
2396                         return 1;
2397                 }
2398
2399                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2400
2401                         if (mdev->agreed_pro_version < 91)
2402                                 return -1091;
2403
2404                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2405                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2406                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2407
2408                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2409                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2410                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2411
2412                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2413                                 *rule_nr = 35;
2414                         } else {
2415                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2416                                 *rule_nr = 37;
2417                         }
2418
2419                         return -1;
2420                 }
2421
2422                 /* Common power [off|failure] */
2423                 rct = (drbd_test_flag(mdev, CRASHED_PRIMARY) ? 1 : 0) +
2424                         (mdev->p_uuid[UI_FLAGS] & 2);
2425                 /* lowest bit is set when we were primary,
2426                  * next bit (weight 2) is set when peer was primary */
2427                 *rule_nr = 40;
2428
2429                 switch (rct) {
2430                 case 0: /* !self_pri && !peer_pri */ return 0;
2431                 case 1: /*  self_pri && !peer_pri */ return 1;
2432                 case 2: /* !self_pri &&  peer_pri */ return -1;
2433                 case 3: /*  self_pri &&  peer_pri */
2434                         dc = drbd_test_flag(mdev, DISCARD_CONCURRENT);
2435                         return dc ? -1 : 1;
2436                 }
2437         }
2438
2439         *rule_nr = 50;
2440         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2441         if (self == peer)
2442                 return -1;
2443
2444         *rule_nr = 51;
2445         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2446         if (self == peer) {
2447                 if (mdev->agreed_pro_version < 96 ?
2448                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2449                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2450                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2451                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2452                            resync as sync source modifications of the peer's UUIDs. */
2453
2454                         if (mdev->agreed_pro_version < 91)
2455                                 return -1091;
2456
2457                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2458                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2459
2460                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2461                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2462
2463                         return -1;
2464                 }
2465         }
2466
2467         *rule_nr = 60;
2468         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2469         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2470                 peer = mdev->p_uuid[i] & ~((u64)1);
2471                 if (self == peer)
2472                         return -2;
2473         }
2474
2475         *rule_nr = 70;
2476         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2477         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2478         if (self == peer)
2479                 return 1;
2480
2481         *rule_nr = 71;
2482         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2483         if (self == peer) {
2484                 if (mdev->agreed_pro_version < 96 ?
2485                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2486                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2487                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2488                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2489                            resync as sync source modifications of our UUIDs. */
2490
2491                         if (mdev->agreed_pro_version < 91)
2492                                 return -1091;
2493
2494                         __drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2495                         __drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2496
2497                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2498                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2499                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2500
2501                         return 1;
2502                 }
2503         }
2504
2505
2506         *rule_nr = 80;
2507         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2508         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2509                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2510                 if (self == peer)
2511                         return 2;
2512         }
2513
2514         *rule_nr = 90;
2515         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2516         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2517         if (self == peer && self != ((u64)0))
2518                 return 100;
2519
2520         *rule_nr = 100;
2521         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2522                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2523                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2524                         peer = mdev->p_uuid[j] & ~((u64)1);
2525                         if (self == peer)
2526                                 return -100;
2527                 }
2528         }
2529
2530         return -1000;
2531 }
2532
2533 /* drbd_sync_handshake() returns the new conn state on success, or
2534    CONN_MASK (-1) on failure.
2535  */
2536 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2537                                            enum drbd_disk_state peer_disk) __must_hold(local)
2538 {
2539         int hg, rule_nr;
2540         enum drbd_conns rv = C_MASK;
2541         enum drbd_disk_state mydisk;
2542
2543         mydisk = mdev->state.disk;
2544         if (mydisk == D_NEGOTIATING)
2545                 mydisk = mdev->new_state_tmp.disk;
2546
2547         dev_info(DEV, "drbd_sync_handshake:\n");
2548
2549         spin_lock_irq(&mdev->ldev->md.uuid_lock);
2550         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2551         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2552                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2553
2554         hg = drbd_uuid_compare(mdev, &rule_nr);
2555         spin_unlock_irq(&mdev->ldev->md.uuid_lock);
2556
2557         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2558
2559         if (hg == -1000) {
2560                 dev_alert(DEV, "Unrelated data, aborting!\n");
2561                 return C_MASK;
2562         }
2563         if (hg < -1000) {
2564                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2565                 return C_MASK;
2566         }
2567
2568         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2569             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2570                 int f = (hg == -100) || abs(hg) == 2;
2571                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2572                 if (f)
2573                         hg = hg*2;
2574                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2575                      hg > 0 ? "source" : "target");
2576         }
2577
2578         if (abs(hg) == 100)
2579                 drbd_khelper(mdev, "initial-split-brain");
2580
2581         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2582                 int pcount = (mdev->state.role == R_PRIMARY)
2583                            + (peer_role == R_PRIMARY);
2584                 int forced = (hg == -100);
2585
2586                 switch (pcount) {
2587                 case 0:
2588                         hg = drbd_asb_recover_0p(mdev);
2589                         break;
2590                 case 1:
2591                         hg = drbd_asb_recover_1p(mdev);
2592                         break;
2593                 case 2:
2594                         hg = drbd_asb_recover_2p(mdev);
2595                         break;
2596                 }
2597                 if (abs(hg) < 100) {
2598                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2599                              "automatically solved. Sync from %s node\n",
2600                              pcount, (hg < 0) ? "peer" : "this");
2601                         if (forced) {
2602                                 dev_warn(DEV, "Doing a full sync, since"
2603                                      " UUIDs where ambiguous.\n");
2604                                 hg = hg*2;
2605                         }
2606                 }
2607         }
2608
2609         if (hg == -100) {
2610                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2611                         hg = -1;
2612                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2613                         hg = 1;
2614
2615                 if (abs(hg) < 100)
2616                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2617                              "Sync from %s node\n",
2618                              (hg < 0) ? "peer" : "this");
2619         }
2620
2621         if (hg == -100) {
2622                 /* FIXME this log message is not correct if we end up here
2623                  * after an attempted attach on a diskless node.
2624                  * We just refuse to attach -- well, we drop the "connection"
2625                  * to that disk, in a way... */
2626                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2627                 drbd_khelper(mdev, "split-brain");
2628                 return C_MASK;
2629         }
2630
2631         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2632                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2633                 return C_MASK;
2634         }
2635
2636         if (hg < 0 && /* by intention we do not use mydisk here. */
2637             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2638                 switch (mdev->net_conf->rr_conflict) {
2639                 case ASB_CALL_HELPER:
2640                         drbd_khelper(mdev, "pri-lost");
2641                         /* fall through */
2642                 case ASB_DISCONNECT:
2643                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2644                         return C_MASK;
2645                 case ASB_VIOLENTLY:
2646                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2647                              "assumption\n");
2648                 }
2649         }
2650
2651         if (mdev->net_conf->dry_run || drbd_test_flag(mdev, CONN_DRY_RUN)) {
2652                 if (hg == 0)
2653                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2654                 else
2655                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2656                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2657                                  abs(hg) >= 2 ? "full" : "bit-map based");
2658                 return C_MASK;
2659         }
2660
2661         if (abs(hg) >= 2) {
2662                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2663                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2664                                         BM_LOCKED_SET_ALLOWED))
2665                         return C_MASK;
2666         }
2667
2668         if (hg > 0) { /* become sync source. */
2669                 rv = C_WF_BITMAP_S;
2670         } else if (hg < 0) { /* become sync target */
2671                 rv = C_WF_BITMAP_T;
2672         } else {
2673                 rv = C_CONNECTED;
2674                 if (drbd_bm_total_weight(mdev)) {
2675                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2676                              drbd_bm_total_weight(mdev));
2677                 }
2678         }
2679
2680         return rv;
2681 }
2682
2683 /* returns 1 if invalid */
2684 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2685 {
2686         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2687         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2688             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2689                 return 0;
2690
2691         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2692         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2693             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2694                 return 1;
2695
2696         /* everything else is valid if they are equal on both sides. */
2697         if (peer == self)
2698                 return 0;
2699
2700         /* everything es is invalid. */
2701         return 1;
2702 }
2703
2704 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2705 {
2706         struct p_protocol *p = &mdev->data.rbuf.protocol;
2707         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2708         int p_want_lose, p_two_primaries, cf;
2709         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2710
2711         p_proto         = be32_to_cpu(p->protocol);
2712         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2713         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2714         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2715         p_two_primaries = be32_to_cpu(p->two_primaries);
2716         cf              = be32_to_cpu(p->conn_flags);
2717         p_want_lose = cf & CF_WANT_LOSE;
2718
2719         drbd_clear_flag(mdev, CONN_DRY_RUN);
2720
2721         if (cf & CF_DRY_RUN)
2722                 drbd_set_flag(mdev, CONN_DRY_RUN);
2723
2724         if (p_proto != mdev->net_conf->wire_protocol) {
2725                 dev_err(DEV, "incompatible communication protocols\n");
2726                 goto disconnect;
2727         }
2728
2729         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2730                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2731                 goto disconnect;
2732         }
2733
2734         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2735                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2736                 goto disconnect;
2737         }
2738
2739         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2740                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2741                 goto disconnect;
2742         }
2743
2744         if (p_want_lose && mdev->net_conf->want_lose) {
2745                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2746                 goto disconnect;
2747         }
2748
2749         if (p_two_primaries != mdev->net_conf->two_primaries) {
2750                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2751                 goto disconnect;
2752         }
2753
2754         if (mdev->agreed_pro_version >= 87) {
2755                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2756
2757                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2758                         return false;
2759
2760                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2761                 if (strcmp(p_integrity_alg, my_alg)) {
2762                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2763                         goto disconnect;
2764                 }
2765                 dev_info(DEV, "data-integrity-alg: %s\n",
2766                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2767         }
2768
2769         return true;
2770
2771 disconnect:
2772         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2773         return false;
2774 }
2775
2776 /* helper function
2777  * input: alg name, feature name
2778  * return: NULL (alg name was "")
2779  *         ERR_PTR(error) if something goes wrong
2780  *         or the crypto hash ptr, if it worked out ok. */
2781 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2782                 const char *alg, const char *name)
2783 {
2784         struct crypto_hash *tfm;
2785
2786         if (!alg[0])
2787                 return NULL;
2788
2789         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2790         if (IS_ERR(tfm)) {
2791                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2792                         alg, name, PTR_ERR(tfm));
2793                 return tfm;
2794         }
2795         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2796                 crypto_free_hash(tfm);
2797                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2798                 return ERR_PTR(-EINVAL);
2799         }
2800         return tfm;
2801 }
2802
2803 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2804 {
2805         int ok = true;
2806         struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2807         unsigned int header_size, data_size, exp_max_sz;
2808         struct crypto_hash *verify_tfm = NULL;
2809         struct crypto_hash *csums_tfm = NULL;
2810         const int apv = mdev->agreed_pro_version;
2811         int *rs_plan_s = NULL;
2812         int fifo_size = 0;
2813
2814         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2815                     : apv == 88 ? sizeof(struct p_rs_param)
2816                                         + SHARED_SECRET_MAX
2817                     : apv <= 94 ? sizeof(struct p_rs_param_89)
2818                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2819
2820         if (packet_size > exp_max_sz) {
2821                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2822                     packet_size, exp_max_sz);
2823                 return false;
2824         }
2825
2826         if (apv <= 88) {
2827                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2828                 data_size   = packet_size  - header_size;
2829         } else if (apv <= 94) {
2830                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2831                 data_size   = packet_size  - header_size;
2832                 D_ASSERT(data_size == 0);
2833         } else {
2834                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2835                 data_size   = packet_size  - header_size;
2836                 D_ASSERT(data_size == 0);
2837         }
2838
2839         /* initialize verify_alg and csums_alg */
2840         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2841
2842         if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2843                 return false;
2844
2845         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2846
2847         if (apv >= 88) {
2848                 if (apv == 88) {
2849                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
2850                                 dev_err(DEV, "verify-alg of wrong size, "
2851                                         "peer wants %u, accepting only up to %u byte\n",
2852                                         data_size, SHARED_SECRET_MAX);
2853                                 return false;
2854                         }
2855
2856                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2857                                 return false;
2858
2859                         /* we expect NUL terminated string */
2860                         /* but just in case someone tries to be evil */
2861                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2862                         p->verify_alg[data_size-1] = 0;
2863
2864                 } else /* apv >= 89 */ {
2865                         /* we still expect NUL terminated strings */
2866                         /* but just in case someone tries to be evil */
2867                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2868                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2869                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2870                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2871                 }
2872
2873                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2874                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2875                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2876                                     mdev->sync_conf.verify_alg, p->verify_alg);
2877                                 goto disconnect;
2878                         }
2879                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2880                                         p->verify_alg, "verify-alg");
2881                         if (IS_ERR(verify_tfm)) {
2882                                 verify_tfm = NULL;
2883                                 goto disconnect;
2884                         }
2885                 }
2886
2887                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2888                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2889                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2890                                     mdev->sync_conf.csums_alg, p->csums_alg);
2891                                 goto disconnect;
2892                         }
2893                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2894                                         p->csums_alg, "csums-alg");
2895                         if (IS_ERR(csums_tfm)) {
2896                                 csums_tfm = NULL;
2897                                 goto disconnect;
2898                         }
2899                 }
2900
2901                 if (apv > 94) {
2902                         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2903                         mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2904                         mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2905                         mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2906                         mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2907
2908                         fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2909                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2910                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_NOIO);
2911                                 if (!rs_plan_s) {
2912                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
2913                                         goto disconnect;
2914                                 }
2915                         }
2916                 }
2917
2918                 spin_lock(&mdev->peer_seq_lock);
2919                 /* lock against drbd_nl_syncer_conf() */
2920                 if (verify_tfm) {
2921                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2922                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2923                         crypto_free_hash(mdev->verify_tfm);
2924                         mdev->verify_tfm = verify_tfm;
2925                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2926                 }
2927                 if (csums_tfm) {
2928                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2929                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2930                         crypto_free_hash(mdev->csums_tfm);
2931                         mdev->csums_tfm = csums_tfm;
2932                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2933                 }
2934                 if (fifo_size != mdev->rs_plan_s.size) {
2935                         kfree(mdev->rs_plan_s.values);
2936                         mdev->rs_plan_s.values = rs_plan_s;
2937                         mdev->rs_plan_s.size   = fifo_size;
2938                         mdev->rs_planed = 0;
2939                 }
2940                 spin_unlock(&mdev->peer_seq_lock);
2941         }
2942
2943         return ok;
2944 disconnect:
2945         /* just for completeness: actually not needed,
2946          * as this is not reached if csums_tfm was ok. */
2947         crypto_free_hash(csums_tfm);
2948         /* but free the verify_tfm again, if csums_tfm did not work out */
2949         crypto_free_hash(verify_tfm);
2950         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2951         return false;
2952 }
2953
2954 /* warn if the arguments differ by more than 12.5% */
2955 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2956         const char *s, sector_t a, sector_t b)
2957 {
2958         sector_t d;
2959         if (a == 0 || b == 0)
2960                 return;
2961         d = (a > b) ? (a - b) : (b - a);
2962         if (d > (a>>3) || d > (b>>3))
2963                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2964                      (unsigned long long)a, (unsigned long long)b);
2965 }
2966
2967 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2968 {
2969         struct p_sizes *p = &mdev->data.rbuf.sizes;
2970         enum determine_dev_size dd = unchanged;
2971         sector_t p_size, p_usize, my_usize;
2972         int ldsc = 0; /* local disk size changed */
2973         enum dds_flags ddsf;
2974
2975         p_size = be64_to_cpu(p->d_size);
2976         p_usize = be64_to_cpu(p->u_size);
2977
2978         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2979                 dev_err(DEV, "some backing storage is needed\n");
2980                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2981                 return false;
2982         }
2983
2984         /* just store the peer's disk size for now.
2985          * we still need to figure out whether we accept that. */
2986         mdev->p_size = p_size;
2987
2988         if (get_ldev(mdev)) {
2989                 warn_if_differ_considerably(mdev, "lower level device sizes",
2990                            p_size, drbd_get_max_capacity(mdev->ldev));
2991                 warn_if_differ_considerably(mdev, "user requested size",
2992                                             p_usize, mdev->ldev->dc.disk_size);
2993
2994                 /* if this is the first connect, or an otherwise expected
2995                  * param exchange, choose the minimum */
2996                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2997                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2998                                              p_usize);
2999
3000                 my_usize = mdev->ldev->dc.disk_size;
3001
3002                 if (mdev->ldev->dc.disk_size != p_usize) {
3003                         mdev->ldev->dc.disk_size = p_usize;
3004                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3005                              (unsigned long)mdev->ldev->dc.disk_size);
3006                 }
3007
3008                 /* Never shrink a device with usable data during connect.
3009                    But allow online shrinking if we are connected. */
3010                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3011                    drbd_get_capacity(mdev->this_bdev) &&
3012                    mdev->state.disk >= D_OUTDATED &&
3013                    mdev->state.conn < C_CONNECTED) {
3014                         dev_err(DEV, "The peer's disk size is too small!\n");
3015                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3016                         mdev->ldev->dc.disk_size = my_usize;
3017                         put_ldev(mdev);
3018                         return false;
3019                 }
3020                 put_ldev(mdev);
3021         }
3022
3023         ddsf = be16_to_cpu(p->dds_flags);
3024         if (get_ldev(mdev)) {
3025                 dd = drbd_determine_dev_size(mdev, ddsf);
3026                 put_ldev(mdev);
3027                 if (dd == dev_size_error)
3028                         return false;
3029                 drbd_md_sync(mdev);
3030         } else {
3031                 /* I am diskless, need to accept the peer's size. */
3032                 drbd_set_my_capacity(mdev, p_size);
3033         }
3034
3035         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3036         drbd_reconsider_max_bio_size(mdev);
3037
3038         if (get_ldev(mdev)) {
3039                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3040                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3041                         ldsc = 1;
3042                 }
3043
3044                 put_ldev(mdev);
3045         }
3046
3047         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3048                 if (be64_to_cpu(p->c_size) !=
3049                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3050                         /* we have different sizes, probably peer
3051                          * needs to know my new size... */
3052                         drbd_send_sizes(mdev, 0, ddsf);
3053                 }
3054                 if (drbd_test_and_clear_flag(mdev, RESIZE_PENDING) ||
3055                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3056                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3057                             mdev->state.disk >= D_INCONSISTENT) {
3058                                 if (ddsf & DDSF_NO_RESYNC)
3059                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3060                                 else
3061                                         resync_after_online_grow(mdev);
3062                         } else
3063                                 drbd_set_flag(mdev, RESYNC_AFTER_NEG);
3064                 }
3065         }
3066
3067         return true;
3068 }
3069
3070 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3071 {
3072         struct p_uuids *p = &mdev->data.rbuf.uuids;
3073         u64 *p_uuid;
3074         int i, updated_uuids = 0;
3075
3076         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3077
3078         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3079                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3080
3081         kfree(mdev->p_uuid);
3082         mdev->p_uuid = p_uuid;
3083
3084         if (mdev->state.conn < C_CONNECTED &&
3085             mdev->state.disk < D_INCONSISTENT &&
3086             mdev->state.role == R_PRIMARY &&
3087             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3088                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3089                     (unsigned long long)mdev->ed_uuid);
3090                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3091                 return false;
3092         }
3093
3094         if (get_ldev(mdev)) {
3095                 int skip_initial_sync =
3096                         mdev->state.conn == C_CONNECTED &&
3097                         mdev->agreed_pro_version >= 90 &&
3098                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3099                         (p_uuid[UI_FLAGS] & 8);
3100                 if (skip_initial_sync) {
3101                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3102                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3103                                         "clear_n_write from receive_uuids",
3104                                         BM_LOCKED_TEST_ALLOWED);
3105                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3106                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3107                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3108                                         CS_VERBOSE, NULL);
3109                         drbd_md_sync(mdev);
3110                         updated_uuids = 1;
3111                 }
3112                 put_ldev(mdev);
3113         } else if (mdev->state.disk < D_INCONSISTENT &&
3114                    mdev->state.role == R_PRIMARY) {
3115                 /* I am a diskless primary, the peer just created a new current UUID
3116                    for me. */
3117                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3118         }
3119
3120         /* Before we test for the disk state, we should wait until an eventually
3121            ongoing cluster wide state change is finished. That is important if
3122            we are primary and are detaching from our disk. We need to see the
3123            new disk state... */
3124         wait_event(mdev->misc_wait, !drbd_test_flag(mdev, CLUSTER_ST_CHANGE));
3125         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3126                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3127
3128         if (updated_uuids)
3129                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3130
3131         return true;
3132 }
3133
3134 /**
3135  * convert_state() - Converts the peer's view of the cluster state to our point of view
3136  * @ps:         The state as seen by the peer.
3137  */
3138 static union drbd_state convert_state(union drbd_state ps)
3139 {
3140         union drbd_state ms;
3141
3142         static enum drbd_conns c_tab[] = {
3143                 [C_CONNECTED] = C_CONNECTED,
3144
3145                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3146                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3147                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3148                 [C_VERIFY_S]       = C_VERIFY_T,
3149                 [C_MASK]   = C_MASK,
3150         };
3151
3152         ms.i = ps.i;
3153
3154         ms.conn = c_tab[ps.conn];
3155         ms.peer = ps.role;
3156         ms.role = ps.peer;
3157         ms.pdsk = ps.disk;
3158         ms.disk = ps.pdsk;
3159         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3160
3161         return ms;
3162 }
3163
3164 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3165 {
3166         struct p_req_state *p = &mdev->data.rbuf.req_state;
3167         union drbd_state mask, val;
3168         enum drbd_state_rv rv;
3169
3170         mask.i = be32_to_cpu(p->mask);
3171         val.i = be32_to_cpu(p->val);
3172
3173         if (drbd_test_flag(mdev, DISCARD_CONCURRENT) &&
3174             drbd_test_flag(mdev, CLUSTER_ST_CHANGE)) {
3175                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3176                 return true;
3177         }
3178
3179         mask = convert_state(mask);
3180         val = convert_state(val);
3181
3182         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3183
3184         drbd_send_sr_reply(mdev, rv);
3185         drbd_md_sync(mdev);
3186
3187         return true;
3188 }
3189
3190 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3191 {
3192         struct p_state *p = &mdev->data.rbuf.state;
3193         union drbd_state os, ns, peer_state;
3194         enum drbd_disk_state real_peer_disk;
3195         enum chg_state_flags cs_flags;
3196         int rv;
3197
3198         peer_state.i = be32_to_cpu(p->state);
3199
3200         real_peer_disk = peer_state.disk;
3201         if (peer_state.disk == D_NEGOTIATING) {
3202                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3203                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3204         }
3205
3206         spin_lock_irq(&mdev->req_lock);
3207  retry:
3208         os = ns = mdev->state;
3209         spin_unlock_irq(&mdev->req_lock);
3210
3211         /* If some other part of the code (asender thread, timeout)
3212          * already decided to close the connection again,
3213          * we must not "re-establish" it here. */
3214         if (os.conn <= C_TEAR_DOWN)
3215                 return false;
3216
3217         /* If this is the "end of sync" confirmation, usually the peer disk
3218          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3219          * set) resync started in PausedSyncT, or if the timing of pause-/
3220          * unpause-sync events has been "just right", the peer disk may
3221          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3222          */
3223         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3224             real_peer_disk == D_UP_TO_DATE &&
3225             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3226                 /* If we are (becoming) SyncSource, but peer is still in sync
3227                  * preparation, ignore its uptodate-ness to avoid flapping, it
3228                  * will change to inconsistent once the peer reaches active
3229                  * syncing states.
3230                  * It may have changed syncer-paused flags, however, so we
3231                  * cannot ignore this completely. */
3232                 if (peer_state.conn > C_CONNECTED &&
3233                     peer_state.conn < C_SYNC_SOURCE)
3234                         real_peer_disk = D_INCONSISTENT;
3235
3236                 /* if peer_state changes to connected at the same time,
3237                  * it explicitly notifies us that it finished resync.
3238                  * Maybe we should finish it up, too? */
3239                 else if (os.conn >= C_SYNC_SOURCE &&
3240                          peer_state.conn == C_CONNECTED) {
3241                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3242                                 drbd_resync_finished(mdev);
3243                         return true;
3244                 }
3245         }
3246
3247         /* explicit verify finished notification, stop sector reached. */
3248         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3249             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3250                 ov_oos_print(mdev);
3251                 drbd_resync_finished(mdev);
3252                 return true;
3253         }
3254
3255         /* peer says his disk is inconsistent, while we think it is uptodate,
3256          * and this happens while the peer still thinks we have a sync going on,
3257          * but we think we are already done with the sync.
3258          * We ignore this to avoid flapping pdsk.
3259          * This should not happen, if the peer is a recent version of drbd. */
3260         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3261             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3262                 real_peer_disk = D_UP_TO_DATE;
3263
3264         if (ns.conn == C_WF_REPORT_PARAMS)
3265                 ns.conn = C_CONNECTED;
3266
3267         if (peer_state.conn == C_AHEAD)
3268                 ns.conn = C_BEHIND;
3269
3270         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3271             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3272                 int cr; /* consider resync */
3273
3274                 /* if we established a new connection */
3275                 cr  = (os.conn < C_CONNECTED);
3276                 /* if we had an established connection
3277                  * and one of the nodes newly attaches a disk */
3278                 cr |= (os.conn == C_CONNECTED &&
3279                        (peer_state.disk == D_NEGOTIATING ||
3280                         os.disk == D_NEGOTIATING));
3281                 /* if we have both been inconsistent, and the peer has been
3282                  * forced to be UpToDate with --overwrite-data */
3283                 cr |= drbd_test_flag(mdev, CONSIDER_RESYNC);
3284                 /* if we had been plain connected, and the admin requested to
3285                  * start a sync by "invalidate" or "invalidate-remote" */
3286                 cr |= (os.conn == C_CONNECTED &&
3287                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3288                                  peer_state.conn <= C_WF_BITMAP_T));
3289
3290                 if (cr)
3291                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3292
3293                 put_ldev(mdev);
3294                 if (ns.conn == C_MASK) {
3295                         ns.conn = C_CONNECTED;
3296                         if (mdev->state.disk == D_NEGOTIATING) {
3297                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3298                         } else if (peer_state.disk == D_NEGOTIATING) {
3299                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3300                                 peer_state.disk = D_DISKLESS;
3301                                 real_peer_disk = D_DISKLESS;
3302                         } else {
3303                                 if (drbd_test_and_clear_flag(mdev, CONN_DRY_RUN))
3304                                         return false;
3305                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3306                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3307                                 return false;
3308                         }
3309                 }
3310         }
3311
3312         spin_lock_irq(&mdev->req_lock);
3313         if (mdev->state.i != os.i)
3314                 goto retry;
3315         drbd_clear_flag(mdev, CONSIDER_RESYNC);
3316         ns.peer = peer_state.role;
3317         ns.pdsk = real_peer_disk;
3318         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3319         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3320                 ns.disk = mdev->new_state_tmp.disk;
3321         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3322         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3323             drbd_test_flag(mdev, NEW_CUR_UUID)) {
3324                 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3325                    for temporal network outages! */
3326                 spin_unlock_irq(&mdev->req_lock);
3327                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3328                 tl_clear(mdev);
3329                 drbd_uuid_new_current(mdev);
3330                 drbd_clear_flag(mdev, NEW_CUR_UUID);
3331                 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3332                 return false;
3333         }
3334         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3335         ns = mdev->state;
3336         spin_unlock_irq(&mdev->req_lock);
3337
3338         if (rv < SS_SUCCESS) {
3339                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3340                 return false;
3341         }
3342
3343         if (os.conn > C_WF_REPORT_PARAMS) {
3344                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3345                     peer_state.disk != D_NEGOTIATING ) {
3346                         /* we want resync, peer has not yet decided to sync... */
3347                         /* Nowadays only used when forcing a node into primary role and
3348                            setting its disk to UpToDate with that */
3349                         drbd_send_uuids(mdev);
3350                         drbd_send_current_state(mdev);
3351                 }
3352         }
3353
3354         mdev->net_conf->want_lose = 0;
3355
3356         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3357
3358         return true;
3359 }
3360
3361 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3362 {
3363         struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3364
3365         wait_event(mdev->misc_wait,
3366                    mdev->state.conn == C_WF_SYNC_UUID ||
3367                    mdev->state.conn == C_BEHIND ||
3368                    mdev->state.conn < C_CONNECTED ||
3369                    mdev->state.disk < D_NEGOTIATING);
3370
3371         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3372
3373         /* Here the _drbd_uuid_ functions are right, current should
3374            _not_ be rotated into the history */
3375         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3376                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3377                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3378
3379                 drbd_print_uuids(mdev, "updated sync uuid");
3380                 drbd_start_resync(mdev, C_SYNC_TARGET);
3381
3382                 put_ldev(mdev);
3383         } else
3384                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3385
3386         return true;
3387 }
3388
3389 /**
3390  * receive_bitmap_plain
3391  *
3392  * Return 0 when done, 1 when another iteration is needed, and a negative error
3393  * code upon failure.
3394  */
3395 static int
3396 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3397                      unsigned long *buffer, struct bm_xfer_ctx *c)
3398 {
3399         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3400         unsigned want = num_words * sizeof(long);
3401         int err;
3402
3403         if (want != data_size) {
3404                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3405                 return -EIO;
3406         }
3407         if (want == 0)
3408                 return 0;
3409         err = drbd_recv(mdev, buffer, want);
3410         if (err != want) {
3411                 if (err >= 0)
3412                         err = -EIO;
3413                 return err;
3414         }
3415
3416         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3417
3418         c->word_offset += num_words;
3419         c->bit_offset = c->word_offset * BITS_PER_LONG;
3420         if (c->bit_offset > c->bm_bits)
3421                 c->bit_offset = c->bm_bits;
3422
3423         return 1;
3424 }
3425
3426 /**
3427  * recv_bm_rle_bits
3428  *
3429  * Return 0 when done, 1 when another iteration is needed, and a negative error
3430  * code upon failure.
3431  */
3432 static int
3433 recv_bm_rle_bits(struct drbd_conf *mdev,
3434                 struct p_compressed_bm *p,
3435                 struct bm_xfer_ctx *c)
3436 {
3437         struct bitstream bs;
3438         u64 look_ahead;
3439         u64 rl;
3440         u64 tmp;
3441         unsigned long s = c->bit_offset;
3442         unsigned long e;
3443         int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3444         int toggle = DCBP_get_start(p);
3445         int have;
3446         int bits;
3447
3448         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3449
3450         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3451         if (bits < 0)
3452                 return -EIO;
3453
3454         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3455                 bits = vli_decode_bits(&rl, look_ahead);
3456                 if (bits <= 0)
3457                         return -EIO;
3458
3459                 if (toggle) {
3460                         e = s + rl -1;
3461                         if (e >= c->bm_bits) {
3462                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3463                                 return -EIO;
3464                         }
3465                         _drbd_bm_set_bits(mdev, s, e);
3466                 }
3467
3468                 if (have < bits) {
3469                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3470                                 have, bits, look_ahead,
3471                                 (unsigned int)(bs.cur.b - p->code),
3472                                 (unsigned int)bs.buf_len);
3473                         return -EIO;
3474                 }
3475                 look_ahead >>= bits;
3476                 have -= bits;
3477
3478                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3479                 if (bits < 0)
3480                         return -EIO;
3481                 look_ahead |= tmp << have;
3482                 have += bits;
3483         }
3484
3485         c->bit_offset = s;
3486         bm_xfer_ctx_bit_to_word_offset(c);
3487
3488         return (s != c->bm_bits);
3489 }
3490
3491 /**
3492  * decode_bitmap_c
3493  *
3494  * Return 0 when done, 1 when another iteration is needed, and a negative error
3495  * code upon failure.
3496  */
3497 static int
3498 decode_bitmap_c(struct drbd_conf *mdev,
3499                 struct p_compressed_bm *p,
3500                 struct bm_xfer_ctx *c)
3501 {
3502         if (DCBP_get_code(p) == RLE_VLI_Bits)
3503                 return recv_bm_rle_bits(mdev, p, c);
3504
3505         /* other variants had been implemented for evaluation,
3506          * but have been dropped as this one turned out to be "best"
3507          * during all our tests. */
3508
3509         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3510         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3511         return -EIO;
3512 }
3513
3514 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3515                 const char *direction, struct bm_xfer_ctx *c)
3516 {
3517         /* what would it take to transfer it "plaintext" */
3518         unsigned plain = sizeof(struct p_header80) *
3519                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3520                 + c->bm_words * sizeof(long);
3521         unsigned total = c->bytes[0] + c->bytes[1];
3522         unsigned r;
3523
3524         /* total can not be zero. but just in case: */
3525         if (total == 0)
3526                 return;
3527
3528         /* don't report if not compressed */
3529         if (total >= plain)
3530                 return;
3531
3532         /* total < plain. check for overflow, still */
3533         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3534                                     : (1000 * total / plain);
3535
3536         if (r > 1000)
3537                 r = 1000;
3538
3539         r = 1000 - r;
3540         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3541              "total %u; compression: %u.%u%%\n",
3542                         direction,
3543                         c->bytes[1], c->packets[1],
3544                         c->bytes[0], c->packets[0],
3545                         total, r/10, r % 10);
3546 }
3547
3548 /* Since we are processing the bitfield from lower addresses to higher,
3549    it does not matter if the process it in 32 bit chunks or 64 bit
3550    chunks as long as it is little endian. (Understand it as byte stream,
3551    beginning with the lowest byte...) If we would use big endian
3552    we would need to process it from the highest address to the lowest,
3553    in order to be agnostic to the 32 vs 64 bits issue.
3554
3555    returns 0 on failure, 1 if we successfully received it. */
3556 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3557 {
3558         struct bm_xfer_ctx c;
3559         void *buffer;
3560         int err;
3561         int ok = false;
3562         struct p_header80 *h = &mdev->data.rbuf.header.h80;
3563
3564         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3565         /* you are supposed to send additional out-of-sync information
3566          * if you actually set bits during this phase */
3567
3568         /* maybe we should use some per thread scratch page,
3569          * and allocate that during initial device creation? */
3570         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3571         if (!buffer) {
3572                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3573                 goto out;
3574         }
3575
3576         c = (struct bm_xfer_ctx) {
3577                 .bm_bits = drbd_bm_bits(mdev),
3578                 .bm_words = drbd_bm_words(mdev),
3579         };
3580
3581         for(;;) {
3582                 if (cmd == P_BITMAP) {
3583                         err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3584                 } else if (cmd == P_COMPRESSED_BITMAP) {
3585                         /* MAYBE: sanity check that we speak proto >= 90,
3586                          * and the feature is enabled! */
3587                         struct p_compressed_bm *p;
3588
3589                         if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3590                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3591                                 goto out;
3592                         }
3593                         /* use the page buff */
3594                         p = buffer;
3595                         memcpy(p, h, sizeof(*h));
3596                         if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3597                                 goto out;
3598                         if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3599                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3600                                 goto out;
3601                         }
3602                         err = decode_bitmap_c(mdev, p, &c);
3603                 } else {
3604                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3605                         goto out;
3606                 }
3607
3608                 c.packets[cmd == P_BITMAP]++;
3609                 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3610
3611                 if (err <= 0) {
3612                         if (err < 0)
3613                                 goto out;
3614                         break;
3615                 }
3616                 if (!drbd_recv_header(mdev, &cmd, &data_size))
3617                         goto out;
3618         }
3619
3620         INFO_bm_xfer_stats(mdev, "receive", &c);
3621
3622         if (mdev->state.conn == C_WF_BITMAP_T) {
3623                 enum drbd_state_rv rv;
3624
3625                 ok = !drbd_send_bitmap(mdev);
3626                 if (!ok)
3627                         goto out;
3628                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3629                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3630                 D_ASSERT(rv == SS_SUCCESS);
3631         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3632                 /* admin may have requested C_DISCONNECTING,
3633                  * other threads may have noticed network errors */
3634                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3635                     drbd_conn_str(mdev->state.conn));
3636         }
3637
3638         ok = true;
3639  out:
3640         drbd_bm_unlock(mdev);
3641         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3642                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3643         free_page((unsigned long) buffer);
3644         return ok;
3645 }
3646
3647 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3648 {
3649         /* TODO zero copy sink :) */
3650         static char sink[128];
3651         int size, want, r;
3652
3653         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3654                  cmd, data_size);
3655
3656         size = data_size;
3657         while (size > 0) {
3658                 want = min_t(int, size, sizeof(sink));
3659                 r = drbd_recv(mdev, sink, want);
3660                 ERR_IF(r <= 0) break;
3661                 size -= r;
3662         }
3663         return size == 0;
3664 }
3665
3666 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3667 {
3668         /* Make sure we've acked all the TCP data associated
3669          * with the data requests being unplugged */
3670         drbd_tcp_quickack(mdev->data.socket);
3671
3672         return true;
3673 }
3674
3675 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3676 {
3677         struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3678
3679         switch (mdev->state.conn) {
3680         case C_WF_SYNC_UUID:
3681         case C_WF_BITMAP_T:
3682         case C_BEHIND:
3683                         break;
3684         default:
3685                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3686                                 drbd_conn_str(mdev->state.conn));
3687         }
3688
3689         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3690
3691         return true;
3692 }
3693
3694 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3695
3696 struct data_cmd {
3697         int expect_payload;
3698         size_t pkt_size;
3699         drbd_cmd_handler_f function;
3700 };
3701
3702 static struct data_cmd drbd_cmd_handler[] = {
3703         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3704         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3705         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3706         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3707         [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
3708         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3709         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3710         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3711         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3712         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
3713         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3714         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3715         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3716         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3717         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3718         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3719         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3720         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3721         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3722         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3723         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3724         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3725         /* anything missing from this table is in
3726          * the asender_tbl, see get_asender_cmd */
3727         [P_MAX_CMD]         = { 0, 0, NULL },
3728 };
3729
3730 /* All handler functions that expect a sub-header get that sub-heder in
3731    mdev->data.rbuf.header.head.payload.
3732
3733    Usually in mdev->data.rbuf.header.head the callback can find the usual
3734    p_header, but they may not rely on that. Since there is also p_header95 !
3735  */
3736
3737 static void drbdd(struct drbd_conf *mdev)
3738 {
3739         union p_header *header = &mdev->data.rbuf.header;
3740         unsigned int packet_size;
3741         enum drbd_packets cmd;
3742         size_t shs; /* sub header size */
3743         int rv;
3744
3745         while (get_t_state(&mdev->receiver) == Running) {
3746                 drbd_thread_current_set_cpu(mdev);
3747                 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3748                         goto err_out;
3749
3750                 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3751                         dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3752                         goto err_out;
3753                 }
3754
3755                 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3756                 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3757                         dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3758                         goto err_out;
3759                 }
3760
3761                 if (shs) {
3762                         rv = drbd_recv(mdev, &header->h80.payload, shs);
3763                         if (unlikely(rv != shs)) {
3764                                 if (!signal_pending(current))
3765                                         dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3766                                 goto err_out;
3767                         }
3768                 }
3769
3770                 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3771
3772                 if (unlikely(!rv)) {
3773                         dev_err(DEV, "error receiving %s, l: %d!\n",
3774                             cmdname(cmd), packet_size);
3775                         goto err_out;
3776                 }
3777         }
3778
3779         if (0) {
3780         err_out:
3781                 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3782         }
3783         /* If we leave here, we probably want to update at least the
3784          * "Connected" indicator on stable storage. Do so explicitly here. */
3785         drbd_md_sync(mdev);
3786 }
3787
3788 void drbd_flush_workqueue(struct drbd_conf *mdev)
3789 {
3790         struct drbd_wq_barrier barr;
3791
3792         barr.w.cb = w_prev_work_done;
3793         init_completion(&barr.done);
3794         drbd_queue_work(&mdev->data.work, &barr.w);
3795         wait_for_completion(&barr.done);
3796 }
3797
3798 void drbd_free_tl_hash(struct drbd_conf *mdev)
3799 {
3800         struct hlist_head *h;
3801
3802         spin_lock_irq(&mdev->req_lock);
3803
3804         if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3805                 spin_unlock_irq(&mdev->req_lock);
3806                 return;
3807         }
3808         /* paranoia code */
3809         for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3810                 if (h->first)
3811                         dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3812                                 (int)(h - mdev->ee_hash), h->first);
3813         kfree(mdev->ee_hash);
3814         mdev->ee_hash = NULL;
3815         mdev->ee_hash_s = 0;
3816
3817         /* We may not have had the chance to wait for all locally pending
3818          * application requests. The hlist_add_fake() prevents access after
3819          * free on master bio completion. */
3820         for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) {
3821                 struct drbd_request *req;
3822                 struct hlist_node *pos, *n;
3823                 hlist_for_each_entry_safe(req, pos, n, h, collision) {
3824                         hlist_del_init(&req->collision);
3825                         hlist_add_fake(&req->collision);
3826                 }
3827         }
3828
3829         kfree(mdev->tl_hash);
3830         mdev->tl_hash = NULL;
3831         mdev->tl_hash_s = 0;
3832         spin_unlock_irq(&mdev->req_lock);
3833 }
3834
3835 static void drbd_disconnect(struct drbd_conf *mdev)
3836 {
3837         enum drbd_fencing_p fp;
3838         union drbd_state os, ns;
3839         int rv = SS_UNKNOWN_ERROR;
3840         unsigned int i;
3841
3842         if (mdev->state.conn == C_STANDALONE)
3843                 return;
3844
3845         /* We are about to start the cleanup after connection loss.
3846          * Make sure drbd_make_request knows about that.
3847          * Usually we should be in some network failure state already,
3848          * but just in case we are not, we fix it up here.
3849          */
3850         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
3851
3852         /* asender does not clean up anything. it must not interfere, either */
3853         drbd_thread_stop(&mdev->asender);
3854         drbd_free_sock(mdev);
3855
3856         /* wait for current activity to cease. */
3857         spin_lock_irq(&mdev->req_lock);
3858         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3859         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3860         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3861         spin_unlock_irq(&mdev->req_lock);
3862
3863         /* We do not have data structures that would allow us to
3864          * get the rs_pending_cnt down to 0 again.
3865          *  * On C_SYNC_TARGET we do not have any data structures describing
3866          *    the pending RSDataRequest's we have sent.
3867          *  * On C_SYNC_SOURCE there is no data structure that tracks
3868          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3869          *  And no, it is not the sum of the reference counts in the
3870          *  resync_LRU. The resync_LRU tracks the whole operation including
3871          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3872          *  on the fly. */
3873         drbd_rs_cancel_all(mdev);
3874         mdev->rs_total = 0;
3875         mdev->rs_failed = 0;
3876         atomic_set(&mdev->rs_pending_cnt, 0);
3877         wake_up(&mdev->misc_wait);
3878
3879         /* make sure syncer is stopped and w_resume_next_sg queued */
3880         del_timer_sync(&mdev->resync_timer);
3881         resync_timer_fn((unsigned long)mdev);
3882
3883         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3884          * w_make_resync_request etc. which may still be on the worker queue
3885          * to be "canceled" */
3886         drbd_flush_workqueue(mdev);
3887
3888         /* This also does reclaim_net_ee().  If we do this too early, we might
3889          * miss some resync ee and pages.*/
3890         drbd_process_done_ee(mdev);
3891
3892         kfree(mdev->p_uuid);
3893         mdev->p_uuid = NULL;
3894
3895         if (!is_susp(mdev->state))
3896                 tl_clear(mdev);
3897
3898         dev_info(DEV, "Connection closed\n");
3899
3900         drbd_md_sync(mdev);
3901
3902         fp = FP_DONT_CARE;
3903         if (get_ldev(mdev)) {
3904                 fp = mdev->ldev->dc.fencing;
3905                 put_ldev(mdev);
3906         }
3907
3908         if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3909                 drbd_try_outdate_peer_async(mdev);
3910
3911         spin_lock_irq(&mdev->req_lock);
3912         os = mdev->state;
3913         if (os.conn >= C_UNCONNECTED) {
3914                 /* Do not restart in case we are C_DISCONNECTING */
3915                 ns = os;
3916                 ns.conn = C_UNCONNECTED;
3917                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3918         }
3919         spin_unlock_irq(&mdev->req_lock);
3920
3921         if (os.conn == C_DISCONNECTING) {
3922                 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3923
3924                 crypto_free_hash(mdev->cram_hmac_tfm);
3925                 mdev->cram_hmac_tfm = NULL;
3926
3927                 kfree(mdev->net_conf);
3928                 mdev->net_conf = NULL;
3929                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3930         }
3931
3932         /* serialize with bitmap writeout triggered by the state change,
3933          * if any. */
3934         wait_event(mdev->misc_wait, !drbd_test_flag(mdev, BITMAP_IO));
3935
3936         /* tcp_close and release of sendpage pages can be deferred.  I don't
3937          * want to use SO_LINGER, because apparently it can be deferred for
3938          * more than 20 seconds (longest time I checked).
3939          *
3940          * Actually we don't care for exactly when the network stack does its
3941          * put_page(), but release our reference on these pages right here.
3942          */
3943         i = drbd_release_ee(mdev, &mdev->net_ee);
3944         if (i)
3945                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3946         i = atomic_read(&mdev->pp_in_use_by_net);
3947         if (i)
3948                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3949         i = atomic_read(&mdev->pp_in_use);
3950         if (i)
3951                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3952
3953         D_ASSERT(list_empty(&mdev->read_ee));
3954         D_ASSERT(list_empty(&mdev->active_ee));
3955         D_ASSERT(list_empty(&mdev->sync_ee));
3956         D_ASSERT(list_empty(&mdev->done_ee));
3957
3958         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3959         atomic_set(&mdev->current_epoch->epoch_size, 0);
3960         D_ASSERT(list_empty(&mdev->current_epoch->list));
3961 }
3962
3963 /*
3964  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3965  * we can agree on is stored in agreed_pro_version.
3966  *
3967  * feature flags and the reserved array should be enough room for future
3968  * enhancements of the handshake protocol, and possible plugins...
3969  *
3970  * for now, they are expected to be zero, but ignored.
3971  */
3972 static int drbd_send_handshake(struct drbd_conf *mdev)
3973 {
3974         /* ASSERT current == mdev->receiver ... */
3975         struct p_handshake *p = &mdev->data.sbuf.handshake;
3976         int ok;
3977
3978         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3979                 dev_err(DEV, "interrupted during initial handshake\n");
3980                 return 0; /* interrupted. not ok. */
3981         }
3982
3983         if (mdev->data.socket == NULL) {
3984                 mutex_unlock(&mdev->data.mutex);
3985                 return 0;
3986         }
3987
3988         memset(p, 0, sizeof(*p));
3989         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3990         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3991         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3992                              (struct p_header80 *)p, sizeof(*p), 0 );
3993         mutex_unlock(&mdev->data.mutex);
3994         return ok;
3995 }
3996
3997 /*
3998  * return values:
3999  *   1 yes, we have a valid connection
4000  *   0 oops, did not work out, please try again
4001  *  -1 peer talks different language,
4002  *     no point in trying again, please go standalone.
4003  */
4004 static int drbd_do_handshake(struct drbd_conf *mdev)
4005 {
4006         /* ASSERT current == mdev->receiver ... */
4007         struct p_handshake *p = &mdev->data.rbuf.handshake;
4008         const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4009         unsigned int length;
4010         enum drbd_packets cmd;
4011         int rv;
4012
4013         rv = drbd_send_handshake(mdev);
4014         if (!rv)
4015                 return 0;
4016
4017         rv = drbd_recv_header(mdev, &cmd, &length);
4018         if (!rv)
4019                 return 0;
4020
4021         if (cmd != P_HAND_SHAKE) {
4022                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
4023                      cmdname(cmd), cmd);
4024                 return -1;
4025         }
4026
4027         if (length != expect) {
4028                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4029                      expect, length);
4030                 return -1;
4031         }
4032
4033         rv = drbd_recv(mdev, &p->head.payload, expect);
4034
4035         if (rv != expect) {
4036                 if (!signal_pending(current))
4037                         dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
4038                 return 0;
4039         }
4040
4041         p->protocol_min = be32_to_cpu(p->protocol_min);
4042         p->protocol_max = be32_to_cpu(p->protocol_max);
4043         if (p->protocol_max == 0)
4044                 p->protocol_max = p->protocol_min;
4045
4046         if (PRO_VERSION_MAX < p->protocol_min ||
4047             PRO_VERSION_MIN > p->protocol_max)
4048                 goto incompat;
4049
4050         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4051
4052         dev_info(DEV, "Handshake successful: "
4053              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4054
4055         return 1;
4056
4057  incompat:
4058         dev_err(DEV, "incompatible DRBD dialects: "
4059             "I support %d-%d, peer supports %d-%d\n",
4060             PRO_VERSION_MIN, PRO_VERSION_MAX,
4061             p->protocol_min, p->protocol_max);
4062         return -1;
4063 }
4064
4065 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4066 static int drbd_do_auth(struct drbd_conf *mdev)
4067 {
4068         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4069         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4070         return -1;
4071 }
4072 #else
4073 #define CHALLENGE_LEN 64
4074
4075 /* Return value:
4076         1 - auth succeeded,
4077         0 - failed, try again (network error),
4078         -1 - auth failed, don't try again.
4079 */
4080
4081 static int drbd_do_auth(struct drbd_conf *mdev)
4082 {
4083         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4084         struct scatterlist sg;
4085         char *response = NULL;
4086         char *right_response = NULL;
4087         char *peers_ch = NULL;
4088         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4089         unsigned int resp_size;
4090         struct hash_desc desc;
4091         enum drbd_packets cmd;
4092         unsigned int length;
4093         int rv;
4094
4095         desc.tfm = mdev->cram_hmac_tfm;
4096         desc.flags = 0;
4097
4098         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4099                                 (u8 *)mdev->net_conf->shared_secret, key_len);
4100         if (rv) {
4101                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4102                 rv = -1;
4103                 goto fail;
4104         }
4105
4106         get_random_bytes(my_challenge, CHALLENGE_LEN);
4107
4108         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4109         if (!rv)
4110                 goto fail;
4111
4112         rv = drbd_recv_header(mdev, &cmd, &length);
4113         if (!rv)
4114                 goto fail;
4115
4116         if (cmd != P_AUTH_CHALLENGE) {
4117                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4118                     cmdname(cmd), cmd);
4119                 rv = 0;
4120                 goto fail;
4121         }
4122
4123         if (length > CHALLENGE_LEN * 2) {
4124                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4125                 rv = -1;
4126                 goto fail;
4127         }
4128
4129         peers_ch = kmalloc(length, GFP_NOIO);
4130         if (peers_ch == NULL) {
4131                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4132                 rv = -1;
4133                 goto fail;
4134         }
4135
4136         rv = drbd_recv(mdev, peers_ch, length);
4137
4138         if (rv != length) {
4139                 if (!signal_pending(current))
4140                         dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4141                 rv = 0;
4142                 goto fail;
4143         }
4144
4145         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4146         response = kmalloc(resp_size, GFP_NOIO);
4147         if (response == NULL) {
4148                 dev_err(DEV, "kmalloc of response failed\n");
4149                 rv = -1;
4150                 goto fail;
4151         }
4152
4153         sg_init_table(&sg, 1);
4154         sg_set_buf(&sg, peers_ch, length);
4155
4156         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4157         if (rv) {
4158                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4159                 rv = -1;
4160                 goto fail;
4161         }
4162
4163         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4164         if (!rv)
4165                 goto fail;
4166
4167         rv = drbd_recv_header(mdev, &cmd, &length);
4168         if (!rv)
4169                 goto fail;
4170
4171         if (cmd != P_AUTH_RESPONSE) {
4172                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4173                         cmdname(cmd), cmd);
4174                 rv = 0;
4175                 goto fail;
4176         }
4177
4178         if (length != resp_size) {
4179                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4180                 rv = 0;
4181                 goto fail;
4182         }
4183
4184         rv = drbd_recv(mdev, response , resp_size);
4185
4186         if (rv != resp_size) {
4187                 if (!signal_pending(current))
4188                         dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4189                 rv = 0;
4190                 goto fail;
4191         }
4192
4193         right_response = kmalloc(resp_size, GFP_NOIO);
4194         if (right_response == NULL) {
4195                 dev_err(DEV, "kmalloc of right_response failed\n");
4196                 rv = -1;
4197                 goto fail;
4198         }
4199
4200         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4201
4202         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4203         if (rv) {
4204                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4205                 rv = -1;
4206                 goto fail;
4207         }
4208
4209         rv = !memcmp(response, right_response, resp_size);
4210
4211         if (rv)
4212                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4213                      resp_size, mdev->net_conf->cram_hmac_alg);
4214         else
4215                 rv = -1;
4216
4217  fail:
4218         kfree(peers_ch);
4219         kfree(response);
4220         kfree(right_response);
4221
4222         return rv;
4223 }
4224 #endif
4225
4226 int drbdd_init(struct drbd_thread *thi)
4227 {
4228         struct drbd_conf *mdev = thi->mdev;
4229         unsigned int minor = mdev_to_minor(mdev);
4230         int h;
4231
4232         sprintf(current->comm, "drbd%d_receiver", minor);
4233
4234         dev_info(DEV, "receiver (re)started\n");
4235
4236         do {
4237                 h = drbd_connect(mdev);
4238                 if (h == 0) {
4239                         drbd_disconnect(mdev);
4240                         schedule_timeout_interruptible(HZ);
4241                 }
4242                 if (h == -1) {
4243                         dev_warn(DEV, "Discarding network configuration.\n");
4244                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4245                 }
4246         } while (h == 0);
4247
4248         if (h > 0) {
4249                 if (get_net_conf(mdev)) {
4250                         drbdd(mdev);
4251                         put_net_conf(mdev);
4252                 }
4253         }
4254
4255         drbd_disconnect(mdev);
4256
4257         dev_info(DEV, "receiver terminated\n");
4258         return 0;
4259 }
4260
4261 /* ********* acknowledge sender ******** */
4262
4263 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4264 {
4265         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4266
4267         int retcode = be32_to_cpu(p->retcode);
4268
4269         if (retcode >= SS_SUCCESS) {
4270                 drbd_set_flag(mdev, CL_ST_CHG_SUCCESS);
4271         } else {
4272                 drbd_set_flag(mdev, CL_ST_CHG_FAIL);
4273                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4274                     drbd_set_st_err_str(retcode), retcode);
4275         }
4276         wake_up(&mdev->state_wait);
4277
4278         return true;
4279 }
4280
4281 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4282 {
4283         return drbd_send_ping_ack(mdev);
4284
4285 }
4286
4287 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4288 {
4289         /* restore idle timeout */
4290         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4291         if (!drbd_test_and_set_flag(mdev, GOT_PING_ACK))
4292                 wake_up(&mdev->misc_wait);
4293
4294         return true;
4295 }
4296
4297 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4298 {
4299         struct p_block_ack *p = (struct p_block_ack *)h;
4300         sector_t sector = be64_to_cpu(p->sector);
4301         int blksize = be32_to_cpu(p->blksize);
4302
4303         D_ASSERT(mdev->agreed_pro_version >= 89);
4304
4305         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4306
4307         if (get_ldev(mdev)) {
4308                 drbd_rs_complete_io(mdev, sector);
4309                 drbd_set_in_sync(mdev, sector, blksize);
4310                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4311                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4312                 put_ldev(mdev);
4313         }
4314         dec_rs_pending(mdev);
4315         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4316
4317         return true;
4318 }
4319
4320 /* when we receive the ACK for a write request,
4321  * verify that we actually know about it */
4322 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4323         u64 id, sector_t sector)
4324 {
4325         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4326         struct hlist_node *n;
4327         struct drbd_request *req;
4328
4329         hlist_for_each_entry(req, n, slot, collision) {
4330                 if ((unsigned long)req == (unsigned long)id) {
4331                         if (req->sector != sector) {
4332                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4333                                     "wrong sector (%llus versus %llus)\n", req,
4334                                     (unsigned long long)req->sector,
4335                                     (unsigned long long)sector);
4336                                 break;
4337                         }
4338                         return req;
4339                 }
4340         }
4341         return NULL;
4342 }
4343
4344 typedef struct drbd_request *(req_validator_fn)
4345         (struct drbd_conf *mdev, u64 id, sector_t sector);
4346
4347 static int validate_req_change_req_state(struct drbd_conf *mdev,
4348         u64 id, sector_t sector, req_validator_fn validator,
4349         const char *func, enum drbd_req_event what)
4350 {
4351         struct drbd_request *req;
4352         struct bio_and_error m;
4353
4354         spin_lock_irq(&mdev->req_lock);
4355         req = validator(mdev, id, sector);
4356         if (unlikely(!req)) {
4357                 spin_unlock_irq(&mdev->req_lock);
4358
4359                 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4360                         (void *)(unsigned long)id, (unsigned long long)sector);
4361                 return false;
4362         }
4363         __req_mod(req, what, &m);
4364         spin_unlock_irq(&mdev->req_lock);
4365
4366         if (m.bio)
4367                 complete_master_bio(mdev, &m);
4368         return true;
4369 }
4370
4371 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4372 {
4373         struct p_block_ack *p = (struct p_block_ack *)h;
4374         sector_t sector = be64_to_cpu(p->sector);
4375         int blksize = be32_to_cpu(p->blksize);
4376         enum drbd_req_event what;
4377
4378         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4379
4380         if (is_syncer_block_id(p->block_id)) {
4381                 drbd_set_in_sync(mdev, sector, blksize);
4382                 dec_rs_pending(mdev);
4383                 return true;
4384         }
4385         switch (be16_to_cpu(h->command)) {
4386         case P_RS_WRITE_ACK:
4387                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4388                 what = write_acked_by_peer_and_sis;
4389                 break;
4390         case P_WRITE_ACK:
4391                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4392                 what = write_acked_by_peer;
4393                 break;
4394         case P_RECV_ACK:
4395                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4396                 what = recv_acked_by_peer;
4397                 break;
4398         case P_DISCARD_ACK:
4399                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4400                 what = conflict_discarded_by_peer;
4401                 break;
4402         default:
4403                 D_ASSERT(0);
4404                 return false;
4405         }
4406
4407         return validate_req_change_req_state(mdev, p->block_id, sector,
4408                 _ack_id_to_req, __func__ , what);
4409 }
4410
4411 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4412 {
4413         struct p_block_ack *p = (struct p_block_ack *)h;
4414         sector_t sector = be64_to_cpu(p->sector);
4415         int size = be32_to_cpu(p->blksize);
4416         struct drbd_request *req;
4417         struct bio_and_error m;
4418
4419         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4420
4421         if (is_syncer_block_id(p->block_id)) {
4422                 dec_rs_pending(mdev);
4423                 drbd_rs_failed_io(mdev, sector, size);
4424                 return true;
4425         }
4426
4427         spin_lock_irq(&mdev->req_lock);
4428         req = _ack_id_to_req(mdev, p->block_id, sector);
4429         if (!req) {
4430                 spin_unlock_irq(&mdev->req_lock);
4431                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4432                     mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4433                         /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4434                            The master bio might already be completed, therefore the
4435                            request is no longer in the collision hash.
4436                            => Do not try to validate block_id as request. */
4437                         /* In Protocol B we might already have got a P_RECV_ACK
4438                            but then get a P_NEG_ACK after wards. */
4439                         drbd_set_out_of_sync(mdev, sector, size);
4440                         return true;
4441                 } else {
4442                         dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4443                                 (void *)(unsigned long)p->block_id, (unsigned long long)sector);
4444                         return false;
4445                 }
4446         }
4447         __req_mod(req, neg_acked, &m);
4448         spin_unlock_irq(&mdev->req_lock);
4449
4450         if (m.bio)
4451                 complete_master_bio(mdev, &m);
4452         return true;
4453 }
4454
4455 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4456 {
4457         struct p_block_ack *p = (struct p_block_ack *)h;
4458         sector_t sector = be64_to_cpu(p->sector);
4459
4460         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4461         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4462             (unsigned long long)sector, be32_to_cpu(p->blksize));
4463
4464         return validate_req_change_req_state(mdev, p->block_id, sector,
4465                 _ar_id_to_req, __func__ , neg_acked);
4466 }
4467
4468 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4469 {
4470         sector_t sector;
4471         int size;
4472         struct p_block_ack *p = (struct p_block_ack *)h;
4473
4474         sector = be64_to_cpu(p->sector);
4475         size = be32_to_cpu(p->blksize);
4476
4477         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4478
4479         dec_rs_pending(mdev);
4480
4481         if (get_ldev_if_state(mdev, D_FAILED)) {
4482                 drbd_rs_complete_io(mdev, sector);
4483                 switch (be16_to_cpu(h->command)) {
4484                 case P_NEG_RS_DREPLY:
4485                         drbd_rs_failed_io(mdev, sector, size);
4486                 case P_RS_CANCEL:
4487                         break;
4488                 default:
4489                         D_ASSERT(0);
4490                         put_ldev(mdev);
4491                         return false;
4492                 }
4493                 put_ldev(mdev);
4494         }
4495
4496         return true;
4497 }
4498
4499 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4500 {
4501         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4502
4503         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4504
4505         if (mdev->state.conn == C_AHEAD &&
4506             atomic_read(&mdev->ap_in_flight) == 0 &&
4507             !drbd_test_and_set_flag(mdev, AHEAD_TO_SYNC_SOURCE)) {
4508                 mdev->start_resync_timer.expires = jiffies + HZ;
4509                 add_timer(&mdev->start_resync_timer);
4510         }
4511
4512         return true;
4513 }
4514
4515 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4516 {
4517         struct p_block_ack *p = (struct p_block_ack *)h;
4518         struct drbd_work *w;
4519         sector_t sector;
4520         int size;
4521
4522         sector = be64_to_cpu(p->sector);
4523         size = be32_to_cpu(p->blksize);
4524
4525         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4526
4527         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4528                 drbd_ov_oos_found(mdev, sector, size);
4529         else
4530                 ov_oos_print(mdev);
4531
4532         if (!get_ldev(mdev))
4533                 return true;
4534
4535         drbd_rs_complete_io(mdev, sector);
4536         dec_rs_pending(mdev);
4537
4538         --mdev->ov_left;
4539
4540         /* let's advance progress step marks only for every other megabyte */
4541         if ((mdev->ov_left & 0x200) == 0x200)
4542                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4543
4544         if (mdev->ov_left == 0) {
4545                 w = kmalloc(sizeof(*w), GFP_NOIO);
4546                 if (w) {
4547                         w->cb = w_ov_finished;
4548                         drbd_queue_work_front(&mdev->data.work, w);
4549                 } else {
4550                         dev_err(DEV, "kmalloc(w) failed.");
4551                         ov_oos_print(mdev);
4552                         drbd_resync_finished(mdev);
4553                 }
4554         }
4555         put_ldev(mdev);
4556         return true;
4557 }
4558
4559 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4560 {
4561         return true;
4562 }
4563
4564 struct asender_cmd {
4565         size_t pkt_size;
4566         int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4567 };
4568
4569 static struct asender_cmd *get_asender_cmd(int cmd)
4570 {
4571         static struct asender_cmd asender_tbl[] = {
4572                 /* anything missing from this table is in
4573                  * the drbd_cmd_handler (drbd_default_handler) table,
4574                  * see the beginning of drbdd() */
4575         [P_PING]            = { sizeof(struct p_header80), got_Ping },
4576         [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
4577         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4578         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4579         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4580         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4581         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4582         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4583         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4584         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4585         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4586         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4587         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4588         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4589         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4590         [P_MAX_CMD]         = { 0, NULL },
4591         };
4592         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4593                 return NULL;
4594         return &asender_tbl[cmd];
4595 }
4596
4597 int drbd_asender(struct drbd_thread *thi)
4598 {
4599         struct drbd_conf *mdev = thi->mdev;
4600         struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4601         struct asender_cmd *cmd = NULL;
4602
4603         int rv, len;
4604         void *buf    = h;
4605         int received = 0;
4606         int expect   = sizeof(struct p_header80);
4607         int empty;
4608         int ping_timeout_active = 0;
4609
4610         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4611
4612         current->policy = SCHED_RR;  /* Make this a realtime task! */
4613         current->rt_priority = 2;    /* more important than all other tasks */
4614
4615         while (get_t_state(thi) == Running) {
4616                 drbd_thread_current_set_cpu(mdev);
4617                 if (drbd_test_and_clear_flag(mdev, SEND_PING)) {
4618                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4619                         mdev->meta.socket->sk->sk_rcvtimeo =
4620                                 mdev->net_conf->ping_timeo*HZ/10;
4621                         ping_timeout_active = 1;
4622                 }
4623
4624                 /* conditionally cork;
4625                  * it may hurt latency if we cork without much to send */
4626                 if (!mdev->net_conf->no_cork &&
4627                         3 < atomic_read(&mdev->unacked_cnt))
4628                         drbd_tcp_cork(mdev->meta.socket);
4629                 while (1) {
4630                         drbd_clear_flag(mdev, SIGNAL_ASENDER);
4631                         flush_signals(current);
4632                         if (!drbd_process_done_ee(mdev))
4633                                 goto reconnect;
4634                         /* to avoid race with newly queued ACKs */
4635                         drbd_set_flag(mdev, SIGNAL_ASENDER);
4636                         spin_lock_irq(&mdev->req_lock);
4637                         empty = list_empty(&mdev->done_ee);
4638                         spin_unlock_irq(&mdev->req_lock);
4639                         /* new ack may have been queued right here,
4640                          * but then there is also a signal pending,
4641                          * and we start over... */
4642                         if (empty)
4643                                 break;
4644                 }
4645                 /* but unconditionally uncork unless disabled */
4646                 if (!mdev->net_conf->no_cork)
4647                         drbd_tcp_uncork(mdev->meta.socket);
4648
4649                 /* short circuit, recv_msg would return EINTR anyways. */
4650                 if (signal_pending(current))
4651                         continue;
4652
4653                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4654                                      buf, expect-received, 0);
4655                 drbd_clear_flag(mdev, SIGNAL_ASENDER);
4656
4657                 flush_signals(current);
4658
4659                 /* Note:
4660                  * -EINTR        (on meta) we got a signal
4661                  * -EAGAIN       (on meta) rcvtimeo expired
4662                  * -ECONNRESET   other side closed the connection
4663                  * -ERESTARTSYS  (on data) we got a signal
4664                  * rv <  0       other than above: unexpected error!
4665                  * rv == expected: full header or command
4666                  * rv <  expected: "woken" by signal during receive
4667                  * rv == 0       : "connection shut down by peer"
4668                  */
4669                 if (likely(rv > 0)) {
4670                         received += rv;
4671                         buf      += rv;
4672                 } else if (rv == 0) {
4673                         if (drbd_test_flag(mdev, DISCONNECT_SENT)) {
4674                                 long t; /* time_left */
4675                                 t = wait_event_timeout(mdev->state_wait, mdev->state.conn < C_CONNECTED,
4676                                                        mdev->net_conf->ping_timeo * HZ/10);
4677                                 if (t)
4678                                         break;
4679                         }
4680                         dev_err(DEV, "meta connection shut down by peer.\n");
4681                         goto reconnect;
4682                 } else if (rv == -EAGAIN) {
4683                         /* If the data socket received something meanwhile,
4684                          * that is good enough: peer is still alive. */
4685                         if (time_after(mdev->last_received,
4686                                 jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4687                                 continue;
4688                         if (ping_timeout_active) {
4689                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4690                                 goto reconnect;
4691                         }
4692                         drbd_set_flag(mdev, SEND_PING);
4693                         continue;
4694                 } else if (rv == -EINTR) {
4695                         continue;
4696                 } else {
4697                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4698                         goto reconnect;
4699                 }
4700
4701                 if (received == expect && cmd == NULL) {
4702                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4703                                 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4704                                     be32_to_cpu(h->magic),
4705                                     be16_to_cpu(h->command),
4706                                     be16_to_cpu(h->length));
4707                                 goto reconnect;
4708                         }
4709                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4710                         len = be16_to_cpu(h->length);
4711                         if (unlikely(cmd == NULL)) {
4712                                 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4713                                     be32_to_cpu(h->magic),
4714                                     be16_to_cpu(h->command),
4715                                     be16_to_cpu(h->length));
4716                                 goto disconnect;
4717                         }
4718                         expect = cmd->pkt_size;
4719                         ERR_IF(len != expect-sizeof(struct p_header80))
4720                                 goto reconnect;
4721                 }
4722                 if (received == expect) {
4723                         mdev->last_received = jiffies;
4724                         D_ASSERT(cmd != NULL);
4725                         if (!cmd->process(mdev, h))
4726                                 goto reconnect;
4727
4728                         /* the idle_timeout (ping-int)
4729                          * has been restored in got_PingAck() */
4730                         if (cmd == get_asender_cmd(P_PING_ACK))
4731                                 ping_timeout_active = 0;
4732
4733                         buf      = h;
4734                         received = 0;
4735                         expect   = sizeof(struct p_header80);
4736                         cmd      = NULL;
4737                 }
4738         }
4739
4740         if (0) {
4741 reconnect:
4742                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4743                 drbd_md_sync(mdev);
4744         }
4745         if (0) {
4746 disconnect:
4747                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4748                 drbd_md_sync(mdev);
4749         }
4750         drbd_clear_flag(mdev, SIGNAL_ASENDER);
4751
4752         D_ASSERT(mdev->state.conn < C_CONNECTED);
4753         dev_info(DEV, "asender terminated\n");
4754
4755         return 0;
4756 }