drbd: Call drbd_md_sync() explicitly after a state change on the connection
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_peer_request *peer_req;
337         struct page *page = NULL;
338         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
339
340         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
341                 return NULL;
342
343         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
344         if (!peer_req) {
345                 if (!(gfp_mask & __GFP_NOWARN))
346                         dev_err(DEV, "%s: allocation failed\n", __func__);
347                 return NULL;
348         }
349
350         if (data_size) {
351                 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
352                 if (!page)
353                         goto fail;
354         }
355
356         drbd_clear_interval(&peer_req->i);
357         peer_req->i.size = data_size;
358         peer_req->i.sector = sector;
359         peer_req->i.local = false;
360         peer_req->i.waiting = false;
361
362         peer_req->epoch = NULL;
363         peer_req->w.mdev = mdev;
364         peer_req->pages = page;
365         atomic_set(&peer_req->pending_bios, 0);
366         peer_req->flags = 0;
367         /*
368          * The block_id is opaque to the receiver.  It is not endianness
369          * converted, and sent back to the sender unchanged.
370          */
371         peer_req->block_id = id;
372
373         return peer_req;
374
375  fail:
376         mempool_free(peer_req, drbd_ee_mempool);
377         return NULL;
378 }
379
380 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
381                        int is_net)
382 {
383         if (peer_req->flags & EE_HAS_DIGEST)
384                 kfree(peer_req->digest);
385         drbd_free_pages(mdev, peer_req->pages, is_net);
386         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
387         D_ASSERT(drbd_interval_empty(&peer_req->i));
388         mempool_free(peer_req, drbd_ee_mempool);
389 }
390
391 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
392 {
393         LIST_HEAD(work_list);
394         struct drbd_peer_request *peer_req, *t;
395         int count = 0;
396         int is_net = list == &mdev->net_ee;
397
398         spin_lock_irq(&mdev->tconn->req_lock);
399         list_splice_init(list, &work_list);
400         spin_unlock_irq(&mdev->tconn->req_lock);
401
402         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
403                 __drbd_free_peer_req(mdev, peer_req, is_net);
404                 count++;
405         }
406         return count;
407 }
408
409 /*
410  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
411  */
412 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
413 {
414         LIST_HEAD(work_list);
415         LIST_HEAD(reclaimed);
416         struct drbd_peer_request *peer_req, *t;
417         int err = 0;
418
419         spin_lock_irq(&mdev->tconn->req_lock);
420         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
421         list_splice_init(&mdev->done_ee, &work_list);
422         spin_unlock_irq(&mdev->tconn->req_lock);
423
424         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
425                 drbd_free_net_peer_req(mdev, peer_req);
426
427         /* possible callbacks here:
428          * e_end_block, and e_end_resync_block, e_send_superseded.
429          * all ignore the last argument.
430          */
431         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432                 int err2;
433
434                 /* list_del not necessary, next/prev members not touched */
435                 err2 = peer_req->w.cb(&peer_req->w, !!err);
436                 if (!err)
437                         err = err2;
438                 drbd_free_peer_req(mdev, peer_req);
439         }
440         wake_up(&mdev->ee_wait);
441
442         return err;
443 }
444
445 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
446                                      struct list_head *head)
447 {
448         DEFINE_WAIT(wait);
449
450         /* avoids spin_lock/unlock
451          * and calling prepare_to_wait in the fast path */
452         while (!list_empty(head)) {
453                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
454                 spin_unlock_irq(&mdev->tconn->req_lock);
455                 io_schedule();
456                 finish_wait(&mdev->ee_wait, &wait);
457                 spin_lock_irq(&mdev->tconn->req_lock);
458         }
459 }
460
461 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
462                                     struct list_head *head)
463 {
464         spin_lock_irq(&mdev->tconn->req_lock);
465         _drbd_wait_ee_list_empty(mdev, head);
466         spin_unlock_irq(&mdev->tconn->req_lock);
467 }
468
469 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
470 {
471         mm_segment_t oldfs;
472         struct kvec iov = {
473                 .iov_base = buf,
474                 .iov_len = size,
475         };
476         struct msghdr msg = {
477                 .msg_iovlen = 1,
478                 .msg_iov = (struct iovec *)&iov,
479                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
480         };
481         int rv;
482
483         oldfs = get_fs();
484         set_fs(KERNEL_DS);
485         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
486         set_fs(oldfs);
487
488         return rv;
489 }
490
491 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
492 {
493         mm_segment_t oldfs;
494         struct kvec iov = {
495                 .iov_base = buf,
496                 .iov_len = size,
497         };
498         struct msghdr msg = {
499                 .msg_iovlen = 1,
500                 .msg_iov = (struct iovec *)&iov,
501                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
502         };
503         int rv;
504
505         oldfs = get_fs();
506         set_fs(KERNEL_DS);
507         rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
508         set_fs(oldfs);
509
510         if (rv < 0) {
511                 if (rv == -ECONNRESET)
512                         conn_info(tconn, "sock was reset by peer\n");
513                 else if (rv != -ERESTARTSYS)
514                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
515         } else if (rv == 0) {
516                 if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
517                         long t;
518                         rcu_read_lock();
519                         t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
520                         rcu_read_unlock();
521
522                         t = wait_event_timeout(tconn->ping_wait, tconn->cstate < C_WF_REPORT_PARAMS, t);
523
524                         if (t)
525                                 goto out;
526                 }
527                 conn_info(tconn, "sock was shut down by peer\n");
528         }
529
530         if (rv != size)
531                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
532
533 out:
534         return rv;
535 }
536
537 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
538 {
539         int err;
540
541         err = drbd_recv(tconn, buf, size);
542         if (err != size) {
543                 if (err >= 0)
544                         err = -EIO;
545         } else
546                 err = 0;
547         return err;
548 }
549
550 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
551 {
552         int err;
553
554         err = drbd_recv_all(tconn, buf, size);
555         if (err && !signal_pending(current))
556                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
557         return err;
558 }
559
560 /* quoting tcp(7):
561  *   On individual connections, the socket buffer size must be set prior to the
562  *   listen(2) or connect(2) calls in order to have it take effect.
563  * This is our wrapper to do so.
564  */
565 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
566                 unsigned int rcv)
567 {
568         /* open coded SO_SNDBUF, SO_RCVBUF */
569         if (snd) {
570                 sock->sk->sk_sndbuf = snd;
571                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
572         }
573         if (rcv) {
574                 sock->sk->sk_rcvbuf = rcv;
575                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
576         }
577 }
578
579 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
580 {
581         const char *what;
582         struct socket *sock;
583         struct sockaddr_in6 src_in6;
584         struct sockaddr_in6 peer_in6;
585         struct net_conf *nc;
586         int err, peer_addr_len, my_addr_len;
587         int sndbuf_size, rcvbuf_size, connect_int;
588         int disconnect_on_error = 1;
589
590         rcu_read_lock();
591         nc = rcu_dereference(tconn->net_conf);
592         if (!nc) {
593                 rcu_read_unlock();
594                 return NULL;
595         }
596         sndbuf_size = nc->sndbuf_size;
597         rcvbuf_size = nc->rcvbuf_size;
598         connect_int = nc->connect_int;
599         rcu_read_unlock();
600
601         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
602         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
603
604         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
605                 src_in6.sin6_port = 0;
606         else
607                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
608
609         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
610         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
611
612         what = "sock_create_kern";
613         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
614                                SOCK_STREAM, IPPROTO_TCP, &sock);
615         if (err < 0) {
616                 sock = NULL;
617                 goto out;
618         }
619
620         sock->sk->sk_rcvtimeo =
621         sock->sk->sk_sndtimeo = connect_int * HZ;
622         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
623
624        /* explicitly bind to the configured IP as source IP
625         *  for the outgoing connections.
626         *  This is needed for multihomed hosts and to be
627         *  able to use lo: interfaces for drbd.
628         * Make sure to use 0 as port number, so linux selects
629         *  a free one dynamically.
630         */
631         what = "bind before connect";
632         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
633         if (err < 0)
634                 goto out;
635
636         /* connect may fail, peer not yet available.
637          * stay C_WF_CONNECTION, don't go Disconnecting! */
638         disconnect_on_error = 0;
639         what = "connect";
640         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
641
642 out:
643         if (err < 0) {
644                 if (sock) {
645                         sock_release(sock);
646                         sock = NULL;
647                 }
648                 switch (-err) {
649                         /* timeout, busy, signal pending */
650                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
651                 case EINTR: case ERESTARTSYS:
652                         /* peer not (yet) available, network problem */
653                 case ECONNREFUSED: case ENETUNREACH:
654                 case EHOSTDOWN:    case EHOSTUNREACH:
655                         disconnect_on_error = 0;
656                         break;
657                 default:
658                         conn_err(tconn, "%s failed, err = %d\n", what, err);
659                 }
660                 if (disconnect_on_error)
661                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
662         }
663
664         return sock;
665 }
666
667 struct accept_wait_data {
668         struct drbd_tconn *tconn;
669         struct socket *s_listen;
670         struct completion door_bell;
671         void (*original_sk_state_change)(struct sock *sk);
672
673 };
674
675 static void drbd_incoming_connection(struct sock *sk)
676 {
677         struct accept_wait_data *ad = sk->sk_user_data;
678         void (*state_change)(struct sock *sk);
679
680         state_change = ad->original_sk_state_change;
681         if (sk->sk_state == TCP_ESTABLISHED)
682                 complete(&ad->door_bell);
683         state_change(sk);
684 }
685
686 static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
687 {
688         int err, sndbuf_size, rcvbuf_size, my_addr_len;
689         struct sockaddr_in6 my_addr;
690         struct socket *s_listen;
691         struct net_conf *nc;
692         const char *what;
693
694         rcu_read_lock();
695         nc = rcu_dereference(tconn->net_conf);
696         if (!nc) {
697                 rcu_read_unlock();
698                 return -EIO;
699         }
700         sndbuf_size = nc->sndbuf_size;
701         rcvbuf_size = nc->rcvbuf_size;
702         rcu_read_unlock();
703
704         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
705         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
706
707         what = "sock_create_kern";
708         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
709                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
710         if (err) {
711                 s_listen = NULL;
712                 goto out;
713         }
714
715         s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
716         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
717
718         what = "bind before listen";
719         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
720         if (err < 0)
721                 goto out;
722
723         ad->s_listen = s_listen;
724         write_lock_bh(&s_listen->sk->sk_callback_lock);
725         ad->original_sk_state_change = s_listen->sk->sk_state_change;
726         s_listen->sk->sk_state_change = drbd_incoming_connection;
727         s_listen->sk->sk_user_data = ad;
728         write_unlock_bh(&s_listen->sk->sk_callback_lock);
729
730         what = "listen";
731         err = s_listen->ops->listen(s_listen, 5);
732         if (err < 0)
733                 goto out;
734
735         return 0;
736 out:
737         if (s_listen)
738                 sock_release(s_listen);
739         if (err < 0) {
740                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
741                         conn_err(tconn, "%s failed, err = %d\n", what, err);
742                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
743                 }
744         }
745
746         return -EIO;
747 }
748
749 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
750 {
751         write_lock_bh(&sk->sk_callback_lock);
752         sk->sk_state_change = ad->original_sk_state_change;
753         sk->sk_user_data = NULL;
754         write_unlock_bh(&sk->sk_callback_lock);
755 }
756
757 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
758 {
759         int timeo, connect_int, err = 0;
760         struct socket *s_estab = NULL;
761         struct net_conf *nc;
762
763         rcu_read_lock();
764         nc = rcu_dereference(tconn->net_conf);
765         if (!nc) {
766                 rcu_read_unlock();
767                 return NULL;
768         }
769         connect_int = nc->connect_int;
770         rcu_read_unlock();
771
772         timeo = connect_int * HZ;
773         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
774
775         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
776         if (err <= 0)
777                 return NULL;
778
779         err = kernel_accept(ad->s_listen, &s_estab, 0);
780         if (err < 0) {
781                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
782                         conn_err(tconn, "accept failed, err = %d\n", err);
783                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
784                 }
785         }
786
787         if (s_estab)
788                 unregister_state_change(s_estab->sk, ad);
789
790         return s_estab;
791 }
792
793 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
794
795 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
796                              enum drbd_packet cmd)
797 {
798         if (!conn_prepare_command(tconn, sock))
799                 return -EIO;
800         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
801 }
802
803 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
804 {
805         unsigned int header_size = drbd_header_size(tconn);
806         struct packet_info pi;
807         int err;
808
809         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
810         if (err != header_size) {
811                 if (err >= 0)
812                         err = -EIO;
813                 return err;
814         }
815         err = decode_header(tconn, tconn->data.rbuf, &pi);
816         if (err)
817                 return err;
818         return pi.cmd;
819 }
820
821 /**
822  * drbd_socket_okay() - Free the socket if its connection is not okay
823  * @sock:       pointer to the pointer to the socket.
824  */
825 static int drbd_socket_okay(struct socket **sock)
826 {
827         int rr;
828         char tb[4];
829
830         if (!*sock)
831                 return false;
832
833         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
834
835         if (rr > 0 || rr == -EAGAIN) {
836                 return true;
837         } else {
838                 sock_release(*sock);
839                 *sock = NULL;
840                 return false;
841         }
842 }
843 /* Gets called if a connection is established, or if a new minor gets created
844    in a connection */
845 int drbd_connected(struct drbd_conf *mdev)
846 {
847         int err;
848
849         atomic_set(&mdev->packet_seq, 0);
850         mdev->peer_seq = 0;
851
852         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
853                 &mdev->tconn->cstate_mutex :
854                 &mdev->own_state_mutex;
855
856         err = drbd_send_sync_param(mdev);
857         if (!err)
858                 err = drbd_send_sizes(mdev, 0, 0);
859         if (!err)
860                 err = drbd_send_uuids(mdev);
861         if (!err)
862                 err = drbd_send_current_state(mdev);
863         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
864         clear_bit(RESIZE_PENDING, &mdev->flags);
865         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
866         return err;
867 }
868
869 /*
870  * return values:
871  *   1 yes, we have a valid connection
872  *   0 oops, did not work out, please try again
873  *  -1 peer talks different language,
874  *     no point in trying again, please go standalone.
875  *  -2 We do not have a network config...
876  */
877 static int conn_connect(struct drbd_tconn *tconn)
878 {
879         struct drbd_socket sock, msock;
880         struct drbd_conf *mdev;
881         struct net_conf *nc;
882         int vnr, timeout, h, ok;
883         bool discard_my_data;
884         enum drbd_state_rv rv;
885         struct accept_wait_data ad = {
886                 .tconn = tconn,
887                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
888         };
889
890         clear_bit(DISCONNECT_SENT, &tconn->flags);
891         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
892                 return -2;
893
894         mutex_init(&sock.mutex);
895         sock.sbuf = tconn->data.sbuf;
896         sock.rbuf = tconn->data.rbuf;
897         sock.socket = NULL;
898         mutex_init(&msock.mutex);
899         msock.sbuf = tconn->meta.sbuf;
900         msock.rbuf = tconn->meta.rbuf;
901         msock.socket = NULL;
902
903         /* Assume that the peer only understands protocol 80 until we know better.  */
904         tconn->agreed_pro_version = 80;
905
906         if (prepare_listen_socket(tconn, &ad))
907                 return 0;
908
909         do {
910                 struct socket *s;
911
912                 s = drbd_try_connect(tconn);
913                 if (s) {
914                         if (!sock.socket) {
915                                 sock.socket = s;
916                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
917                         } else if (!msock.socket) {
918                                 clear_bit(RESOLVE_CONFLICTS, &tconn->flags);
919                                 msock.socket = s;
920                                 send_first_packet(tconn, &msock, P_INITIAL_META);
921                         } else {
922                                 conn_err(tconn, "Logic error in conn_connect()\n");
923                                 goto out_release_sockets;
924                         }
925                 }
926
927                 if (sock.socket && msock.socket) {
928                         rcu_read_lock();
929                         nc = rcu_dereference(tconn->net_conf);
930                         timeout = nc->ping_timeo * HZ / 10;
931                         rcu_read_unlock();
932                         schedule_timeout_interruptible(timeout);
933                         ok = drbd_socket_okay(&sock.socket);
934                         ok = drbd_socket_okay(&msock.socket) && ok;
935                         if (ok)
936                                 break;
937                 }
938
939 retry:
940                 s = drbd_wait_for_connect(tconn, &ad);
941                 if (s) {
942                         int fp = receive_first_packet(tconn, s);
943                         drbd_socket_okay(&sock.socket);
944                         drbd_socket_okay(&msock.socket);
945                         switch (fp) {
946                         case P_INITIAL_DATA:
947                                 if (sock.socket) {
948                                         conn_warn(tconn, "initial packet S crossed\n");
949                                         sock_release(sock.socket);
950                                         sock.socket = s;
951                                         goto randomize;
952                                 }
953                                 sock.socket = s;
954                                 break;
955                         case P_INITIAL_META:
956                                 set_bit(RESOLVE_CONFLICTS, &tconn->flags);
957                                 if (msock.socket) {
958                                         conn_warn(tconn, "initial packet M crossed\n");
959                                         sock_release(msock.socket);
960                                         msock.socket = s;
961                                         goto randomize;
962                                 }
963                                 msock.socket = s;
964                                 break;
965                         default:
966                                 conn_warn(tconn, "Error receiving initial packet\n");
967                                 sock_release(s);
968 randomize:
969                                 if (random32() & 1)
970                                         goto retry;
971                         }
972                 }
973
974                 if (tconn->cstate <= C_DISCONNECTING)
975                         goto out_release_sockets;
976                 if (signal_pending(current)) {
977                         flush_signals(current);
978                         smp_rmb();
979                         if (get_t_state(&tconn->receiver) == EXITING)
980                                 goto out_release_sockets;
981                 }
982
983                 ok = drbd_socket_okay(&sock.socket);
984                 ok = drbd_socket_okay(&msock.socket) && ok;
985         } while (!ok);
986
987         if (ad.s_listen)
988                 sock_release(ad.s_listen);
989
990         sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
991         msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
992
993         sock.socket->sk->sk_allocation = GFP_NOIO;
994         msock.socket->sk->sk_allocation = GFP_NOIO;
995
996         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
997         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
998
999         /* NOT YET ...
1000          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
1001          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1002          * first set it to the P_CONNECTION_FEATURES timeout,
1003          * which we set to 4x the configured ping_timeout. */
1004         rcu_read_lock();
1005         nc = rcu_dereference(tconn->net_conf);
1006
1007         sock.socket->sk->sk_sndtimeo =
1008         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1009
1010         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1011         timeout = nc->timeout * HZ / 10;
1012         discard_my_data = nc->discard_my_data;
1013         rcu_read_unlock();
1014
1015         msock.socket->sk->sk_sndtimeo = timeout;
1016
1017         /* we don't want delays.
1018          * we use TCP_CORK where appropriate, though */
1019         drbd_tcp_nodelay(sock.socket);
1020         drbd_tcp_nodelay(msock.socket);
1021
1022         tconn->data.socket = sock.socket;
1023         tconn->meta.socket = msock.socket;
1024         tconn->last_received = jiffies;
1025
1026         h = drbd_do_features(tconn);
1027         if (h <= 0)
1028                 return h;
1029
1030         if (tconn->cram_hmac_tfm) {
1031                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
1032                 switch (drbd_do_auth(tconn)) {
1033                 case -1:
1034                         conn_err(tconn, "Authentication of peer failed\n");
1035                         return -1;
1036                 case 0:
1037                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1038                         return 0;
1039                 }
1040         }
1041
1042         tconn->data.socket->sk->sk_sndtimeo = timeout;
1043         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1044
1045         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1046                 return -1;
1047
1048         set_bit(STATE_SENT, &tconn->flags);
1049
1050         rcu_read_lock();
1051         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1052                 kref_get(&mdev->kref);
1053                 rcu_read_unlock();
1054
1055                 if (discard_my_data)
1056                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1057                 else
1058                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1059
1060                 drbd_connected(mdev);
1061                 kref_put(&mdev->kref, &drbd_minor_destroy);
1062                 rcu_read_lock();
1063         }
1064         rcu_read_unlock();
1065
1066         rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1067         if (rv < SS_SUCCESS) {
1068                 clear_bit(STATE_SENT, &tconn->flags);
1069                 return 0;
1070         }
1071
1072         drbd_thread_start(&tconn->asender);
1073
1074         mutex_lock(&tconn->conf_update);
1075         /* The discard_my_data flag is a single-shot modifier to the next
1076          * connection attempt, the handshake of which is now well underway.
1077          * No need for rcu style copying of the whole struct
1078          * just to clear a single value. */
1079         tconn->net_conf->discard_my_data = 0;
1080         mutex_unlock(&tconn->conf_update);
1081
1082         return h;
1083
1084 out_release_sockets:
1085         if (ad.s_listen)
1086                 sock_release(ad.s_listen);
1087         if (sock.socket)
1088                 sock_release(sock.socket);
1089         if (msock.socket)
1090                 sock_release(msock.socket);
1091         return -1;
1092 }
1093
1094 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1095 {
1096         unsigned int header_size = drbd_header_size(tconn);
1097
1098         if (header_size == sizeof(struct p_header100) &&
1099             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1100                 struct p_header100 *h = header;
1101                 if (h->pad != 0) {
1102                         conn_err(tconn, "Header padding is not zero\n");
1103                         return -EINVAL;
1104                 }
1105                 pi->vnr = be16_to_cpu(h->volume);
1106                 pi->cmd = be16_to_cpu(h->command);
1107                 pi->size = be32_to_cpu(h->length);
1108         } else if (header_size == sizeof(struct p_header95) &&
1109                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1110                 struct p_header95 *h = header;
1111                 pi->cmd = be16_to_cpu(h->command);
1112                 pi->size = be32_to_cpu(h->length);
1113                 pi->vnr = 0;
1114         } else if (header_size == sizeof(struct p_header80) &&
1115                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1116                 struct p_header80 *h = header;
1117                 pi->cmd = be16_to_cpu(h->command);
1118                 pi->size = be16_to_cpu(h->length);
1119                 pi->vnr = 0;
1120         } else {
1121                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1122                          be32_to_cpu(*(__be32 *)header),
1123                          tconn->agreed_pro_version);
1124                 return -EINVAL;
1125         }
1126         pi->data = header + header_size;
1127         return 0;
1128 }
1129
1130 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1131 {
1132         void *buffer = tconn->data.rbuf;
1133         int err;
1134
1135         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1136         if (err)
1137                 return err;
1138
1139         err = decode_header(tconn, buffer, pi);
1140         tconn->last_received = jiffies;
1141
1142         return err;
1143 }
1144
1145 static void drbd_flush(struct drbd_tconn *tconn)
1146 {
1147         int rv;
1148         struct drbd_conf *mdev;
1149         int vnr;
1150
1151         if (tconn->write_ordering >= WO_bdev_flush) {
1152                 rcu_read_lock();
1153                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1154                         if (!get_ldev(mdev))
1155                                 continue;
1156                         kref_get(&mdev->kref);
1157                         rcu_read_unlock();
1158
1159                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1160                                         GFP_NOIO, NULL);
1161                         if (rv) {
1162                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1163                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1164                                  * don't try again for ANY return value != 0
1165                                  * if (rv == -EOPNOTSUPP) */
1166                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1167                         }
1168                         put_ldev(mdev);
1169                         kref_put(&mdev->kref, &drbd_minor_destroy);
1170
1171                         rcu_read_lock();
1172                         if (rv)
1173                                 break;
1174                 }
1175                 rcu_read_unlock();
1176         }
1177 }
1178
1179 /**
1180  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1181  * @mdev:       DRBD device.
1182  * @epoch:      Epoch object.
1183  * @ev:         Epoch event.
1184  */
1185 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1186                                                struct drbd_epoch *epoch,
1187                                                enum epoch_event ev)
1188 {
1189         int epoch_size;
1190         struct drbd_epoch *next_epoch;
1191         enum finish_epoch rv = FE_STILL_LIVE;
1192
1193         spin_lock(&tconn->epoch_lock);
1194         do {
1195                 next_epoch = NULL;
1196
1197                 epoch_size = atomic_read(&epoch->epoch_size);
1198
1199                 switch (ev & ~EV_CLEANUP) {
1200                 case EV_PUT:
1201                         atomic_dec(&epoch->active);
1202                         break;
1203                 case EV_GOT_BARRIER_NR:
1204                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1205                         break;
1206                 case EV_BECAME_LAST:
1207                         /* nothing to do*/
1208                         break;
1209                 }
1210
1211                 if (epoch_size != 0 &&
1212                     atomic_read(&epoch->active) == 0 &&
1213                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1214                         if (!(ev & EV_CLEANUP)) {
1215                                 spin_unlock(&tconn->epoch_lock);
1216                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1217                                 spin_lock(&tconn->epoch_lock);
1218                         }
1219 #if 0
1220                         /* FIXME: dec unacked on connection, once we have
1221                          * something to count pending connection packets in. */
1222                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1223                                 dec_unacked(epoch->tconn);
1224 #endif
1225
1226                         if (tconn->current_epoch != epoch) {
1227                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1228                                 list_del(&epoch->list);
1229                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1230                                 tconn->epochs--;
1231                                 kfree(epoch);
1232
1233                                 if (rv == FE_STILL_LIVE)
1234                                         rv = FE_DESTROYED;
1235                         } else {
1236                                 epoch->flags = 0;
1237                                 atomic_set(&epoch->epoch_size, 0);
1238                                 /* atomic_set(&epoch->active, 0); is already zero */
1239                                 if (rv == FE_STILL_LIVE)
1240                                         rv = FE_RECYCLED;
1241                         }
1242                 }
1243
1244                 if (!next_epoch)
1245                         break;
1246
1247                 epoch = next_epoch;
1248         } while (1);
1249
1250         spin_unlock(&tconn->epoch_lock);
1251
1252         return rv;
1253 }
1254
1255 /**
1256  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1257  * @tconn:      DRBD connection.
1258  * @wo:         Write ordering method to try.
1259  */
1260 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1261 {
1262         struct disk_conf *dc;
1263         struct drbd_conf *mdev;
1264         enum write_ordering_e pwo;
1265         int vnr;
1266         static char *write_ordering_str[] = {
1267                 [WO_none] = "none",
1268                 [WO_drain_io] = "drain",
1269                 [WO_bdev_flush] = "flush",
1270         };
1271
1272         pwo = tconn->write_ordering;
1273         wo = min(pwo, wo);
1274         rcu_read_lock();
1275         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1276                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1277                         continue;
1278                 dc = rcu_dereference(mdev->ldev->disk_conf);
1279
1280                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1281                         wo = WO_drain_io;
1282                 if (wo == WO_drain_io && !dc->disk_drain)
1283                         wo = WO_none;
1284                 put_ldev(mdev);
1285         }
1286         rcu_read_unlock();
1287         tconn->write_ordering = wo;
1288         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1289                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1290 }
1291
1292 /**
1293  * drbd_submit_peer_request()
1294  * @mdev:       DRBD device.
1295  * @peer_req:   peer request
1296  * @rw:         flag field, see bio->bi_rw
1297  *
1298  * May spread the pages to multiple bios,
1299  * depending on bio_add_page restrictions.
1300  *
1301  * Returns 0 if all bios have been submitted,
1302  * -ENOMEM if we could not allocate enough bios,
1303  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1304  *  single page to an empty bio (which should never happen and likely indicates
1305  *  that the lower level IO stack is in some way broken). This has been observed
1306  *  on certain Xen deployments.
1307  */
1308 /* TODO allocate from our own bio_set. */
1309 int drbd_submit_peer_request(struct drbd_conf *mdev,
1310                              struct drbd_peer_request *peer_req,
1311                              const unsigned rw, const int fault_type)
1312 {
1313         struct bio *bios = NULL;
1314         struct bio *bio;
1315         struct page *page = peer_req->pages;
1316         sector_t sector = peer_req->i.sector;
1317         unsigned ds = peer_req->i.size;
1318         unsigned n_bios = 0;
1319         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1320         int err = -ENOMEM;
1321
1322         /* In most cases, we will only need one bio.  But in case the lower
1323          * level restrictions happen to be different at this offset on this
1324          * side than those of the sending peer, we may need to submit the
1325          * request in more than one bio.
1326          *
1327          * Plain bio_alloc is good enough here, this is no DRBD internally
1328          * generated bio, but a bio allocated on behalf of the peer.
1329          */
1330 next_bio:
1331         bio = bio_alloc(GFP_NOIO, nr_pages);
1332         if (!bio) {
1333                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1334                 goto fail;
1335         }
1336         /* > peer_req->i.sector, unless this is the first bio */
1337         bio->bi_sector = sector;
1338         bio->bi_bdev = mdev->ldev->backing_bdev;
1339         bio->bi_rw = rw;
1340         bio->bi_private = peer_req;
1341         bio->bi_end_io = drbd_peer_request_endio;
1342
1343         bio->bi_next = bios;
1344         bios = bio;
1345         ++n_bios;
1346
1347         page_chain_for_each(page) {
1348                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1349                 if (!bio_add_page(bio, page, len, 0)) {
1350                         /* A single page must always be possible!
1351                          * But in case it fails anyways,
1352                          * we deal with it, and complain (below). */
1353                         if (bio->bi_vcnt == 0) {
1354                                 dev_err(DEV,
1355                                         "bio_add_page failed for len=%u, "
1356                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1357                                         len, (unsigned long long)bio->bi_sector);
1358                                 err = -ENOSPC;
1359                                 goto fail;
1360                         }
1361                         goto next_bio;
1362                 }
1363                 ds -= len;
1364                 sector += len >> 9;
1365                 --nr_pages;
1366         }
1367         D_ASSERT(page == NULL);
1368         D_ASSERT(ds == 0);
1369
1370         atomic_set(&peer_req->pending_bios, n_bios);
1371         do {
1372                 bio = bios;
1373                 bios = bios->bi_next;
1374                 bio->bi_next = NULL;
1375
1376                 drbd_generic_make_request(mdev, fault_type, bio);
1377         } while (bios);
1378         return 0;
1379
1380 fail:
1381         while (bios) {
1382                 bio = bios;
1383                 bios = bios->bi_next;
1384                 bio_put(bio);
1385         }
1386         return err;
1387 }
1388
1389 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1390                                              struct drbd_peer_request *peer_req)
1391 {
1392         struct drbd_interval *i = &peer_req->i;
1393
1394         drbd_remove_interval(&mdev->write_requests, i);
1395         drbd_clear_interval(i);
1396
1397         /* Wake up any processes waiting for this peer request to complete.  */
1398         if (i->waiting)
1399                 wake_up(&mdev->misc_wait);
1400 }
1401
1402 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1403 {
1404         struct drbd_conf *mdev;
1405         int vnr;
1406
1407         rcu_read_lock();
1408         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1409                 kref_get(&mdev->kref);
1410                 rcu_read_unlock();
1411                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1412                 kref_put(&mdev->kref, &drbd_minor_destroy);
1413                 rcu_read_lock();
1414         }
1415         rcu_read_unlock();
1416 }
1417
1418 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1419 {
1420         int rv;
1421         struct p_barrier *p = pi->data;
1422         struct drbd_epoch *epoch;
1423
1424         /* FIXME these are unacked on connection,
1425          * not a specific (peer)device.
1426          */
1427         tconn->current_epoch->barrier_nr = p->barrier;
1428         tconn->current_epoch->tconn = tconn;
1429         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1430
1431         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1432          * the activity log, which means it would not be resynced in case the
1433          * R_PRIMARY crashes now.
1434          * Therefore we must send the barrier_ack after the barrier request was
1435          * completed. */
1436         switch (tconn->write_ordering) {
1437         case WO_none:
1438                 if (rv == FE_RECYCLED)
1439                         return 0;
1440
1441                 /* receiver context, in the writeout path of the other node.
1442                  * avoid potential distributed deadlock */
1443                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1444                 if (epoch)
1445                         break;
1446                 else
1447                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1448                         /* Fall through */
1449
1450         case WO_bdev_flush:
1451         case WO_drain_io:
1452                 conn_wait_active_ee_empty(tconn);
1453                 drbd_flush(tconn);
1454
1455                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1456                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1457                         if (epoch)
1458                                 break;
1459                 }
1460
1461                 return 0;
1462         default:
1463                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1464                 return -EIO;
1465         }
1466
1467         epoch->flags = 0;
1468         atomic_set(&epoch->epoch_size, 0);
1469         atomic_set(&epoch->active, 0);
1470
1471         spin_lock(&tconn->epoch_lock);
1472         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1473                 list_add(&epoch->list, &tconn->current_epoch->list);
1474                 tconn->current_epoch = epoch;
1475                 tconn->epochs++;
1476         } else {
1477                 /* The current_epoch got recycled while we allocated this one... */
1478                 kfree(epoch);
1479         }
1480         spin_unlock(&tconn->epoch_lock);
1481
1482         return 0;
1483 }
1484
1485 /* used from receive_RSDataReply (recv_resync_read)
1486  * and from receive_Data */
1487 static struct drbd_peer_request *
1488 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1489               int data_size) __must_hold(local)
1490 {
1491         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1492         struct drbd_peer_request *peer_req;
1493         struct page *page;
1494         int dgs, ds, err;
1495         void *dig_in = mdev->tconn->int_dig_in;
1496         void *dig_vv = mdev->tconn->int_dig_vv;
1497         unsigned long *data;
1498
1499         dgs = 0;
1500         if (mdev->tconn->peer_integrity_tfm) {
1501                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1502                 /*
1503                  * FIXME: Receive the incoming digest into the receive buffer
1504                  *        here, together with its struct p_data?
1505                  */
1506                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1507                 if (err)
1508                         return NULL;
1509                 data_size -= dgs;
1510         }
1511
1512         if (!expect(IS_ALIGNED(data_size, 512)))
1513                 return NULL;
1514         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1515                 return NULL;
1516
1517         /* even though we trust out peer,
1518          * we sometimes have to double check. */
1519         if (sector + (data_size>>9) > capacity) {
1520                 dev_err(DEV, "request from peer beyond end of local disk: "
1521                         "capacity: %llus < sector: %llus + size: %u\n",
1522                         (unsigned long long)capacity,
1523                         (unsigned long long)sector, data_size);
1524                 return NULL;
1525         }
1526
1527         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1528          * "criss-cross" setup, that might cause write-out on some other DRBD,
1529          * which in turn might block on the other node at this very place.  */
1530         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1531         if (!peer_req)
1532                 return NULL;
1533
1534         if (!data_size)
1535                 return peer_req;
1536
1537         ds = data_size;
1538         page = peer_req->pages;
1539         page_chain_for_each(page) {
1540                 unsigned len = min_t(int, ds, PAGE_SIZE);
1541                 data = kmap(page);
1542                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1543                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1544                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1545                         data[0] = data[0] ^ (unsigned long)-1;
1546                 }
1547                 kunmap(page);
1548                 if (err) {
1549                         drbd_free_peer_req(mdev, peer_req);
1550                         return NULL;
1551                 }
1552                 ds -= len;
1553         }
1554
1555         if (dgs) {
1556                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1557                 if (memcmp(dig_in, dig_vv, dgs)) {
1558                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1559                                 (unsigned long long)sector, data_size);
1560                         drbd_free_peer_req(mdev, peer_req);
1561                         return NULL;
1562                 }
1563         }
1564         mdev->recv_cnt += data_size>>9;
1565         return peer_req;
1566 }
1567
1568 /* drbd_drain_block() just takes a data block
1569  * out of the socket input buffer, and discards it.
1570  */
1571 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1572 {
1573         struct page *page;
1574         int err = 0;
1575         void *data;
1576
1577         if (!data_size)
1578                 return 0;
1579
1580         page = drbd_alloc_pages(mdev, 1, 1);
1581
1582         data = kmap(page);
1583         while (data_size) {
1584                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1585
1586                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1587                 if (err)
1588                         break;
1589                 data_size -= len;
1590         }
1591         kunmap(page);
1592         drbd_free_pages(mdev, page, 0);
1593         return err;
1594 }
1595
1596 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1597                            sector_t sector, int data_size)
1598 {
1599         struct bio_vec *bvec;
1600         struct bio *bio;
1601         int dgs, err, i, expect;
1602         void *dig_in = mdev->tconn->int_dig_in;
1603         void *dig_vv = mdev->tconn->int_dig_vv;
1604
1605         dgs = 0;
1606         if (mdev->tconn->peer_integrity_tfm) {
1607                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1608                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1609                 if (err)
1610                         return err;
1611                 data_size -= dgs;
1612         }
1613
1614         /* optimistically update recv_cnt.  if receiving fails below,
1615          * we disconnect anyways, and counters will be reset. */
1616         mdev->recv_cnt += data_size>>9;
1617
1618         bio = req->master_bio;
1619         D_ASSERT(sector == bio->bi_sector);
1620
1621         bio_for_each_segment(bvec, bio, i) {
1622                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1623                 expect = min_t(int, data_size, bvec->bv_len);
1624                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1625                 kunmap(bvec->bv_page);
1626                 if (err)
1627                         return err;
1628                 data_size -= expect;
1629         }
1630
1631         if (dgs) {
1632                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1633                 if (memcmp(dig_in, dig_vv, dgs)) {
1634                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1635                         return -EINVAL;
1636                 }
1637         }
1638
1639         D_ASSERT(data_size == 0);
1640         return 0;
1641 }
1642
1643 /*
1644  * e_end_resync_block() is called in asender context via
1645  * drbd_finish_peer_reqs().
1646  */
1647 static int e_end_resync_block(struct drbd_work *w, int unused)
1648 {
1649         struct drbd_peer_request *peer_req =
1650                 container_of(w, struct drbd_peer_request, w);
1651         struct drbd_conf *mdev = w->mdev;
1652         sector_t sector = peer_req->i.sector;
1653         int err;
1654
1655         D_ASSERT(drbd_interval_empty(&peer_req->i));
1656
1657         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1658                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1659                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1660         } else {
1661                 /* Record failure to sync */
1662                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1663
1664                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1665         }
1666         dec_unacked(mdev);
1667
1668         return err;
1669 }
1670
1671 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1672 {
1673         struct drbd_peer_request *peer_req;
1674
1675         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1676         if (!peer_req)
1677                 goto fail;
1678
1679         dec_rs_pending(mdev);
1680
1681         inc_unacked(mdev);
1682         /* corresponding dec_unacked() in e_end_resync_block()
1683          * respective _drbd_clear_done_ee */
1684
1685         peer_req->w.cb = e_end_resync_block;
1686
1687         spin_lock_irq(&mdev->tconn->req_lock);
1688         list_add(&peer_req->w.list, &mdev->sync_ee);
1689         spin_unlock_irq(&mdev->tconn->req_lock);
1690
1691         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1692         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1693                 return 0;
1694
1695         /* don't care for the reason here */
1696         dev_err(DEV, "submit failed, triggering re-connect\n");
1697         spin_lock_irq(&mdev->tconn->req_lock);
1698         list_del(&peer_req->w.list);
1699         spin_unlock_irq(&mdev->tconn->req_lock);
1700
1701         drbd_free_peer_req(mdev, peer_req);
1702 fail:
1703         put_ldev(mdev);
1704         return -EIO;
1705 }
1706
1707 static struct drbd_request *
1708 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1709              sector_t sector, bool missing_ok, const char *func)
1710 {
1711         struct drbd_request *req;
1712
1713         /* Request object according to our peer */
1714         req = (struct drbd_request *)(unsigned long)id;
1715         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1716                 return req;
1717         if (!missing_ok) {
1718                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1719                         (unsigned long)id, (unsigned long long)sector);
1720         }
1721         return NULL;
1722 }
1723
1724 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1725 {
1726         struct drbd_conf *mdev;
1727         struct drbd_request *req;
1728         sector_t sector;
1729         int err;
1730         struct p_data *p = pi->data;
1731
1732         mdev = vnr_to_mdev(tconn, pi->vnr);
1733         if (!mdev)
1734                 return -EIO;
1735
1736         sector = be64_to_cpu(p->sector);
1737
1738         spin_lock_irq(&mdev->tconn->req_lock);
1739         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1740         spin_unlock_irq(&mdev->tconn->req_lock);
1741         if (unlikely(!req))
1742                 return -EIO;
1743
1744         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1745          * special casing it there for the various failure cases.
1746          * still no race with drbd_fail_pending_reads */
1747         err = recv_dless_read(mdev, req, sector, pi->size);
1748         if (!err)
1749                 req_mod(req, DATA_RECEIVED);
1750         /* else: nothing. handled from drbd_disconnect...
1751          * I don't think we may complete this just yet
1752          * in case we are "on-disconnect: freeze" */
1753
1754         return err;
1755 }
1756
1757 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1758 {
1759         struct drbd_conf *mdev;
1760         sector_t sector;
1761         int err;
1762         struct p_data *p = pi->data;
1763
1764         mdev = vnr_to_mdev(tconn, pi->vnr);
1765         if (!mdev)
1766                 return -EIO;
1767
1768         sector = be64_to_cpu(p->sector);
1769         D_ASSERT(p->block_id == ID_SYNCER);
1770
1771         if (get_ldev(mdev)) {
1772                 /* data is submitted to disk within recv_resync_read.
1773                  * corresponding put_ldev done below on error,
1774                  * or in drbd_peer_request_endio. */
1775                 err = recv_resync_read(mdev, sector, pi->size);
1776         } else {
1777                 if (__ratelimit(&drbd_ratelimit_state))
1778                         dev_err(DEV, "Can not write resync data to local disk.\n");
1779
1780                 err = drbd_drain_block(mdev, pi->size);
1781
1782                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1783         }
1784
1785         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1786
1787         return err;
1788 }
1789
1790 static void restart_conflicting_writes(struct drbd_conf *mdev,
1791                                        sector_t sector, int size)
1792 {
1793         struct drbd_interval *i;
1794         struct drbd_request *req;
1795
1796         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1797                 if (!i->local)
1798                         continue;
1799                 req = container_of(i, struct drbd_request, i);
1800                 if (req->rq_state & RQ_LOCAL_PENDING ||
1801                     !(req->rq_state & RQ_POSTPONED))
1802                         continue;
1803                 /* as it is RQ_POSTPONED, this will cause it to
1804                  * be queued on the retry workqueue. */
1805                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1806         }
1807 }
1808
1809 /*
1810  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1811  */
1812 static int e_end_block(struct drbd_work *w, int cancel)
1813 {
1814         struct drbd_peer_request *peer_req =
1815                 container_of(w, struct drbd_peer_request, w);
1816         struct drbd_conf *mdev = w->mdev;
1817         sector_t sector = peer_req->i.sector;
1818         int err = 0, pcmd;
1819
1820         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1821                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1822                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1823                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1824                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1825                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1826                         err = drbd_send_ack(mdev, pcmd, peer_req);
1827                         if (pcmd == P_RS_WRITE_ACK)
1828                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1829                 } else {
1830                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1831                         /* we expect it to be marked out of sync anyways...
1832                          * maybe assert this?  */
1833                 }
1834                 dec_unacked(mdev);
1835         }
1836         /* we delete from the conflict detection hash _after_ we sent out the
1837          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1838         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1839                 spin_lock_irq(&mdev->tconn->req_lock);
1840                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1841                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1842                 if (peer_req->flags & EE_RESTART_REQUESTS)
1843                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1844                 spin_unlock_irq(&mdev->tconn->req_lock);
1845         } else
1846                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1847
1848         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1849
1850         return err;
1851 }
1852
1853 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1854 {
1855         struct drbd_conf *mdev = w->mdev;
1856         struct drbd_peer_request *peer_req =
1857                 container_of(w, struct drbd_peer_request, w);
1858         int err;
1859
1860         err = drbd_send_ack(mdev, ack, peer_req);
1861         dec_unacked(mdev);
1862
1863         return err;
1864 }
1865
1866 static int e_send_superseded(struct drbd_work *w, int unused)
1867 {
1868         return e_send_ack(w, P_SUPERSEDED);
1869 }
1870
1871 static int e_send_retry_write(struct drbd_work *w, int unused)
1872 {
1873         struct drbd_tconn *tconn = w->mdev->tconn;
1874
1875         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1876                              P_RETRY_WRITE : P_SUPERSEDED);
1877 }
1878
1879 static bool seq_greater(u32 a, u32 b)
1880 {
1881         /*
1882          * We assume 32-bit wrap-around here.
1883          * For 24-bit wrap-around, we would have to shift:
1884          *  a <<= 8; b <<= 8;
1885          */
1886         return (s32)a - (s32)b > 0;
1887 }
1888
1889 static u32 seq_max(u32 a, u32 b)
1890 {
1891         return seq_greater(a, b) ? a : b;
1892 }
1893
1894 static bool need_peer_seq(struct drbd_conf *mdev)
1895 {
1896         struct drbd_tconn *tconn = mdev->tconn;
1897         int tp;
1898
1899         /*
1900          * We only need to keep track of the last packet_seq number of our peer
1901          * if we are in dual-primary mode and we have the resolve-conflicts flag set; see
1902          * handle_write_conflicts().
1903          */
1904
1905         rcu_read_lock();
1906         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1907         rcu_read_unlock();
1908
1909         return tp && test_bit(RESOLVE_CONFLICTS, &tconn->flags);
1910 }
1911
1912 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1913 {
1914         unsigned int newest_peer_seq;
1915
1916         if (need_peer_seq(mdev)) {
1917                 spin_lock(&mdev->peer_seq_lock);
1918                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1919                 mdev->peer_seq = newest_peer_seq;
1920                 spin_unlock(&mdev->peer_seq_lock);
1921                 /* wake up only if we actually changed mdev->peer_seq */
1922                 if (peer_seq == newest_peer_seq)
1923                         wake_up(&mdev->seq_wait);
1924         }
1925 }
1926
1927 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1928 {
1929         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1930 }
1931
1932 /* maybe change sync_ee into interval trees as well? */
1933 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1934 {
1935         struct drbd_peer_request *rs_req;
1936         bool rv = 0;
1937
1938         spin_lock_irq(&mdev->tconn->req_lock);
1939         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1940                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1941                              rs_req->i.sector, rs_req->i.size)) {
1942                         rv = 1;
1943                         break;
1944                 }
1945         }
1946         spin_unlock_irq(&mdev->tconn->req_lock);
1947
1948         return rv;
1949 }
1950
1951 /* Called from receive_Data.
1952  * Synchronize packets on sock with packets on msock.
1953  *
1954  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1955  * packet traveling on msock, they are still processed in the order they have
1956  * been sent.
1957  *
1958  * Note: we don't care for Ack packets overtaking P_DATA packets.
1959  *
1960  * In case packet_seq is larger than mdev->peer_seq number, there are
1961  * outstanding packets on the msock. We wait for them to arrive.
1962  * In case we are the logically next packet, we update mdev->peer_seq
1963  * ourselves. Correctly handles 32bit wrap around.
1964  *
1965  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1966  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1967  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1968  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1969  *
1970  * returns 0 if we may process the packet,
1971  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1972 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1973 {
1974         DEFINE_WAIT(wait);
1975         long timeout;
1976         int ret;
1977
1978         if (!need_peer_seq(mdev))
1979                 return 0;
1980
1981         spin_lock(&mdev->peer_seq_lock);
1982         for (;;) {
1983                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1984                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1985                         ret = 0;
1986                         break;
1987                 }
1988                 if (signal_pending(current)) {
1989                         ret = -ERESTARTSYS;
1990                         break;
1991                 }
1992                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1993                 spin_unlock(&mdev->peer_seq_lock);
1994                 rcu_read_lock();
1995                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1996                 rcu_read_unlock();
1997                 timeout = schedule_timeout(timeout);
1998                 spin_lock(&mdev->peer_seq_lock);
1999                 if (!timeout) {
2000                         ret = -ETIMEDOUT;
2001                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
2002                         break;
2003                 }
2004         }
2005         spin_unlock(&mdev->peer_seq_lock);
2006         finish_wait(&mdev->seq_wait, &wait);
2007         return ret;
2008 }
2009
2010 /* see also bio_flags_to_wire()
2011  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2012  * flags and back. We may replicate to other kernel versions. */
2013 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
2014 {
2015         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2016                 (dpf & DP_FUA ? REQ_FUA : 0) |
2017                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2018                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2019 }
2020
2021 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2022                                     unsigned int size)
2023 {
2024         struct drbd_interval *i;
2025
2026     repeat:
2027         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2028                 struct drbd_request *req;
2029                 struct bio_and_error m;
2030
2031                 if (!i->local)
2032                         continue;
2033                 req = container_of(i, struct drbd_request, i);
2034                 if (!(req->rq_state & RQ_POSTPONED))
2035                         continue;
2036                 req->rq_state &= ~RQ_POSTPONED;
2037                 __req_mod(req, NEG_ACKED, &m);
2038                 spin_unlock_irq(&mdev->tconn->req_lock);
2039                 if (m.bio)
2040                         complete_master_bio(mdev, &m);
2041                 spin_lock_irq(&mdev->tconn->req_lock);
2042                 goto repeat;
2043         }
2044 }
2045
2046 static int handle_write_conflicts(struct drbd_conf *mdev,
2047                                   struct drbd_peer_request *peer_req)
2048 {
2049         struct drbd_tconn *tconn = mdev->tconn;
2050         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &tconn->flags);
2051         sector_t sector = peer_req->i.sector;
2052         const unsigned int size = peer_req->i.size;
2053         struct drbd_interval *i;
2054         bool equal;
2055         int err;
2056
2057         /*
2058          * Inserting the peer request into the write_requests tree will prevent
2059          * new conflicting local requests from being added.
2060          */
2061         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2062
2063     repeat:
2064         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2065                 if (i == &peer_req->i)
2066                         continue;
2067
2068                 if (!i->local) {
2069                         /*
2070                          * Our peer has sent a conflicting remote request; this
2071                          * should not happen in a two-node setup.  Wait for the
2072                          * earlier peer request to complete.
2073                          */
2074                         err = drbd_wait_misc(mdev, i);
2075                         if (err)
2076                                 goto out;
2077                         goto repeat;
2078                 }
2079
2080                 equal = i->sector == sector && i->size == size;
2081                 if (resolve_conflicts) {
2082                         /*
2083                          * If the peer request is fully contained within the
2084                          * overlapping request, it can be considered overwritten
2085                          * and thus superseded; otherwise, it will be retried
2086                          * once all overlapping requests have completed.
2087                          */
2088                         bool superseded = i->sector <= sector && i->sector +
2089                                        (i->size >> 9) >= sector + (size >> 9);
2090
2091                         if (!equal)
2092                                 dev_alert(DEV, "Concurrent writes detected: "
2093                                                "local=%llus +%u, remote=%llus +%u, "
2094                                                "assuming %s came first\n",
2095                                           (unsigned long long)i->sector, i->size,
2096                                           (unsigned long long)sector, size,
2097                                           superseded ? "local" : "remote");
2098
2099                         inc_unacked(mdev);
2100                         peer_req->w.cb = superseded ? e_send_superseded :
2101                                                    e_send_retry_write;
2102                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2103                         wake_asender(mdev->tconn);
2104
2105                         err = -ENOENT;
2106                         goto out;
2107                 } else {
2108                         struct drbd_request *req =
2109                                 container_of(i, struct drbd_request, i);
2110
2111                         if (!equal)
2112                                 dev_alert(DEV, "Concurrent writes detected: "
2113                                                "local=%llus +%u, remote=%llus +%u\n",
2114                                           (unsigned long long)i->sector, i->size,
2115                                           (unsigned long long)sector, size);
2116
2117                         if (req->rq_state & RQ_LOCAL_PENDING ||
2118                             !(req->rq_state & RQ_POSTPONED)) {
2119                                 /*
2120                                  * Wait for the node with the discard flag to
2121                                  * decide if this request has been superseded
2122                                  * or needs to be retried.
2123                                  * Requests that have been superseded will
2124                                  * disappear from the write_requests tree.
2125                                  *
2126                                  * In addition, wait for the conflicting
2127                                  * request to finish locally before submitting
2128                                  * the conflicting peer request.
2129                                  */
2130                                 err = drbd_wait_misc(mdev, &req->i);
2131                                 if (err) {
2132                                         _conn_request_state(mdev->tconn,
2133                                                             NS(conn, C_TIMEOUT),
2134                                                             CS_HARD);
2135                                         fail_postponed_requests(mdev, sector, size);
2136                                         goto out;
2137                                 }
2138                                 goto repeat;
2139                         }
2140                         /*
2141                          * Remember to restart the conflicting requests after
2142                          * the new peer request has completed.
2143                          */
2144                         peer_req->flags |= EE_RESTART_REQUESTS;
2145                 }
2146         }
2147         err = 0;
2148
2149     out:
2150         if (err)
2151                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2152         return err;
2153 }
2154
2155 /* mirrored write */
2156 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2157 {
2158         struct drbd_conf *mdev;
2159         sector_t sector;
2160         struct drbd_peer_request *peer_req;
2161         struct p_data *p = pi->data;
2162         u32 peer_seq = be32_to_cpu(p->seq_num);
2163         int rw = WRITE;
2164         u32 dp_flags;
2165         int err, tp;
2166
2167         mdev = vnr_to_mdev(tconn, pi->vnr);
2168         if (!mdev)
2169                 return -EIO;
2170
2171         if (!get_ldev(mdev)) {
2172                 int err2;
2173
2174                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2175                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2176                 atomic_inc(&tconn->current_epoch->epoch_size);
2177                 err2 = drbd_drain_block(mdev, pi->size);
2178                 if (!err)
2179                         err = err2;
2180                 return err;
2181         }
2182
2183         /*
2184          * Corresponding put_ldev done either below (on various errors), or in
2185          * drbd_peer_request_endio, if we successfully submit the data at the
2186          * end of this function.
2187          */
2188
2189         sector = be64_to_cpu(p->sector);
2190         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2191         if (!peer_req) {
2192                 put_ldev(mdev);
2193                 return -EIO;
2194         }
2195
2196         peer_req->w.cb = e_end_block;
2197
2198         dp_flags = be32_to_cpu(p->dp_flags);
2199         rw |= wire_flags_to_bio(mdev, dp_flags);
2200         if (peer_req->pages == NULL) {
2201                 D_ASSERT(peer_req->i.size == 0);
2202                 D_ASSERT(dp_flags & DP_FLUSH);
2203         }
2204
2205         if (dp_flags & DP_MAY_SET_IN_SYNC)
2206                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2207
2208         spin_lock(&tconn->epoch_lock);
2209         peer_req->epoch = tconn->current_epoch;
2210         atomic_inc(&peer_req->epoch->epoch_size);
2211         atomic_inc(&peer_req->epoch->active);
2212         spin_unlock(&tconn->epoch_lock);
2213
2214         rcu_read_lock();
2215         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2216         rcu_read_unlock();
2217         if (tp) {
2218                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2219                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2220                 if (err)
2221                         goto out_interrupted;
2222                 spin_lock_irq(&mdev->tconn->req_lock);
2223                 err = handle_write_conflicts(mdev, peer_req);
2224                 if (err) {
2225                         spin_unlock_irq(&mdev->tconn->req_lock);
2226                         if (err == -ENOENT) {
2227                                 put_ldev(mdev);
2228                                 return 0;
2229                         }
2230                         goto out_interrupted;
2231                 }
2232         } else
2233                 spin_lock_irq(&mdev->tconn->req_lock);
2234         list_add(&peer_req->w.list, &mdev->active_ee);
2235         spin_unlock_irq(&mdev->tconn->req_lock);
2236
2237         if (mdev->state.conn == C_SYNC_TARGET)
2238                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2239
2240         if (mdev->tconn->agreed_pro_version < 100) {
2241                 rcu_read_lock();
2242                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2243                 case DRBD_PROT_C:
2244                         dp_flags |= DP_SEND_WRITE_ACK;
2245                         break;
2246                 case DRBD_PROT_B:
2247                         dp_flags |= DP_SEND_RECEIVE_ACK;
2248                         break;
2249                 }
2250                 rcu_read_unlock();
2251         }
2252
2253         if (dp_flags & DP_SEND_WRITE_ACK) {
2254                 peer_req->flags |= EE_SEND_WRITE_ACK;
2255                 inc_unacked(mdev);
2256                 /* corresponding dec_unacked() in e_end_block()
2257                  * respective _drbd_clear_done_ee */
2258         }
2259
2260         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2261                 /* I really don't like it that the receiver thread
2262                  * sends on the msock, but anyways */
2263                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2264         }
2265
2266         if (mdev->state.pdsk < D_INCONSISTENT) {
2267                 /* In case we have the only disk of the cluster, */
2268                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2269                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2270                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2271                 drbd_al_begin_io(mdev, &peer_req->i);
2272         }
2273
2274         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2275         if (!err)
2276                 return 0;
2277
2278         /* don't care for the reason here */
2279         dev_err(DEV, "submit failed, triggering re-connect\n");
2280         spin_lock_irq(&mdev->tconn->req_lock);
2281         list_del(&peer_req->w.list);
2282         drbd_remove_epoch_entry_interval(mdev, peer_req);
2283         spin_unlock_irq(&mdev->tconn->req_lock);
2284         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2285                 drbd_al_complete_io(mdev, &peer_req->i);
2286
2287 out_interrupted:
2288         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2289         put_ldev(mdev);
2290         drbd_free_peer_req(mdev, peer_req);
2291         return err;
2292 }
2293
2294 /* We may throttle resync, if the lower device seems to be busy,
2295  * and current sync rate is above c_min_rate.
2296  *
2297  * To decide whether or not the lower device is busy, we use a scheme similar
2298  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2299  * (more than 64 sectors) of activity we cannot account for with our own resync
2300  * activity, it obviously is "busy".
2301  *
2302  * The current sync rate used here uses only the most recent two step marks,
2303  * to have a short time average so we can react faster.
2304  */
2305 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2306 {
2307         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2308         unsigned long db, dt, dbdt;
2309         struct lc_element *tmp;
2310         int curr_events;
2311         int throttle = 0;
2312         unsigned int c_min_rate;
2313
2314         rcu_read_lock();
2315         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2316         rcu_read_unlock();
2317
2318         /* feature disabled? */
2319         if (c_min_rate == 0)
2320                 return 0;
2321
2322         spin_lock_irq(&mdev->al_lock);
2323         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2324         if (tmp) {
2325                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2326                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2327                         spin_unlock_irq(&mdev->al_lock);
2328                         return 0;
2329                 }
2330                 /* Do not slow down if app IO is already waiting for this extent */
2331         }
2332         spin_unlock_irq(&mdev->al_lock);
2333
2334         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2335                       (int)part_stat_read(&disk->part0, sectors[1]) -
2336                         atomic_read(&mdev->rs_sect_ev);
2337
2338         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2339                 unsigned long rs_left;
2340                 int i;
2341
2342                 mdev->rs_last_events = curr_events;
2343
2344                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2345                  * approx. */
2346                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2347
2348                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2349                         rs_left = mdev->ov_left;
2350                 else
2351                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2352
2353                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2354                 if (!dt)
2355                         dt++;
2356                 db = mdev->rs_mark_left[i] - rs_left;
2357                 dbdt = Bit2KB(db/dt);
2358
2359                 if (dbdt > c_min_rate)
2360                         throttle = 1;
2361         }
2362         return throttle;
2363 }
2364
2365
2366 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2367 {
2368         struct drbd_conf *mdev;
2369         sector_t sector;
2370         sector_t capacity;
2371         struct drbd_peer_request *peer_req;
2372         struct digest_info *di = NULL;
2373         int size, verb;
2374         unsigned int fault_type;
2375         struct p_block_req *p = pi->data;
2376
2377         mdev = vnr_to_mdev(tconn, pi->vnr);
2378         if (!mdev)
2379                 return -EIO;
2380         capacity = drbd_get_capacity(mdev->this_bdev);
2381
2382         sector = be64_to_cpu(p->sector);
2383         size   = be32_to_cpu(p->blksize);
2384
2385         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2386                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2387                                 (unsigned long long)sector, size);
2388                 return -EINVAL;
2389         }
2390         if (sector + (size>>9) > capacity) {
2391                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2392                                 (unsigned long long)sector, size);
2393                 return -EINVAL;
2394         }
2395
2396         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2397                 verb = 1;
2398                 switch (pi->cmd) {
2399                 case P_DATA_REQUEST:
2400                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2401                         break;
2402                 case P_RS_DATA_REQUEST:
2403                 case P_CSUM_RS_REQUEST:
2404                 case P_OV_REQUEST:
2405                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2406                         break;
2407                 case P_OV_REPLY:
2408                         verb = 0;
2409                         dec_rs_pending(mdev);
2410                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2411                         break;
2412                 default:
2413                         BUG();
2414                 }
2415                 if (verb && __ratelimit(&drbd_ratelimit_state))
2416                         dev_err(DEV, "Can not satisfy peer's read request, "
2417                             "no local data.\n");
2418
2419                 /* drain possibly payload */
2420                 return drbd_drain_block(mdev, pi->size);
2421         }
2422
2423         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2424          * "criss-cross" setup, that might cause write-out on some other DRBD,
2425          * which in turn might block on the other node at this very place.  */
2426         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2427         if (!peer_req) {
2428                 put_ldev(mdev);
2429                 return -ENOMEM;
2430         }
2431
2432         switch (pi->cmd) {
2433         case P_DATA_REQUEST:
2434                 peer_req->w.cb = w_e_end_data_req;
2435                 fault_type = DRBD_FAULT_DT_RD;
2436                 /* application IO, don't drbd_rs_begin_io */
2437                 goto submit;
2438
2439         case P_RS_DATA_REQUEST:
2440                 peer_req->w.cb = w_e_end_rsdata_req;
2441                 fault_type = DRBD_FAULT_RS_RD;
2442                 /* used in the sector offset progress display */
2443                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2444                 break;
2445
2446         case P_OV_REPLY:
2447         case P_CSUM_RS_REQUEST:
2448                 fault_type = DRBD_FAULT_RS_RD;
2449                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2450                 if (!di)
2451                         goto out_free_e;
2452
2453                 di->digest_size = pi->size;
2454                 di->digest = (((char *)di)+sizeof(struct digest_info));
2455
2456                 peer_req->digest = di;
2457                 peer_req->flags |= EE_HAS_DIGEST;
2458
2459                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2460                         goto out_free_e;
2461
2462                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2463                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2464                         peer_req->w.cb = w_e_end_csum_rs_req;
2465                         /* used in the sector offset progress display */
2466                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2467                 } else if (pi->cmd == P_OV_REPLY) {
2468                         /* track progress, we may need to throttle */
2469                         atomic_add(size >> 9, &mdev->rs_sect_in);
2470                         peer_req->w.cb = w_e_end_ov_reply;
2471                         dec_rs_pending(mdev);
2472                         /* drbd_rs_begin_io done when we sent this request,
2473                          * but accounting still needs to be done. */
2474                         goto submit_for_resync;
2475                 }
2476                 break;
2477
2478         case P_OV_REQUEST:
2479                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2480                     mdev->tconn->agreed_pro_version >= 90) {
2481                         unsigned long now = jiffies;
2482                         int i;
2483                         mdev->ov_start_sector = sector;
2484                         mdev->ov_position = sector;
2485                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2486                         mdev->rs_total = mdev->ov_left;
2487                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2488                                 mdev->rs_mark_left[i] = mdev->ov_left;
2489                                 mdev->rs_mark_time[i] = now;
2490                         }
2491                         dev_info(DEV, "Online Verify start sector: %llu\n",
2492                                         (unsigned long long)sector);
2493                 }
2494                 peer_req->w.cb = w_e_end_ov_req;
2495                 fault_type = DRBD_FAULT_RS_RD;
2496                 break;
2497
2498         default:
2499                 BUG();
2500         }
2501
2502         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2503          * wrt the receiver, but it is not as straightforward as it may seem.
2504          * Various places in the resync start and stop logic assume resync
2505          * requests are processed in order, requeuing this on the worker thread
2506          * introduces a bunch of new code for synchronization between threads.
2507          *
2508          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2509          * "forever", throttling after drbd_rs_begin_io will lock that extent
2510          * for application writes for the same time.  For now, just throttle
2511          * here, where the rest of the code expects the receiver to sleep for
2512          * a while, anyways.
2513          */
2514
2515         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2516          * this defers syncer requests for some time, before letting at least
2517          * on request through.  The resync controller on the receiving side
2518          * will adapt to the incoming rate accordingly.
2519          *
2520          * We cannot throttle here if remote is Primary/SyncTarget:
2521          * we would also throttle its application reads.
2522          * In that case, throttling is done on the SyncTarget only.
2523          */
2524         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2525                 schedule_timeout_uninterruptible(HZ/10);
2526         if (drbd_rs_begin_io(mdev, sector))
2527                 goto out_free_e;
2528
2529 submit_for_resync:
2530         atomic_add(size >> 9, &mdev->rs_sect_ev);
2531
2532 submit:
2533         inc_unacked(mdev);
2534         spin_lock_irq(&mdev->tconn->req_lock);
2535         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2536         spin_unlock_irq(&mdev->tconn->req_lock);
2537
2538         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2539                 return 0;
2540
2541         /* don't care for the reason here */
2542         dev_err(DEV, "submit failed, triggering re-connect\n");
2543         spin_lock_irq(&mdev->tconn->req_lock);
2544         list_del(&peer_req->w.list);
2545         spin_unlock_irq(&mdev->tconn->req_lock);
2546         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2547
2548 out_free_e:
2549         put_ldev(mdev);
2550         drbd_free_peer_req(mdev, peer_req);
2551         return -EIO;
2552 }
2553
2554 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2555 {
2556         int self, peer, rv = -100;
2557         unsigned long ch_self, ch_peer;
2558         enum drbd_after_sb_p after_sb_0p;
2559
2560         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2561         peer = mdev->p_uuid[UI_BITMAP] & 1;
2562
2563         ch_peer = mdev->p_uuid[UI_SIZE];
2564         ch_self = mdev->comm_bm_set;
2565
2566         rcu_read_lock();
2567         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2568         rcu_read_unlock();
2569         switch (after_sb_0p) {
2570         case ASB_CONSENSUS:
2571         case ASB_DISCARD_SECONDARY:
2572         case ASB_CALL_HELPER:
2573         case ASB_VIOLENTLY:
2574                 dev_err(DEV, "Configuration error.\n");
2575                 break;
2576         case ASB_DISCONNECT:
2577                 break;
2578         case ASB_DISCARD_YOUNGER_PRI:
2579                 if (self == 0 && peer == 1) {
2580                         rv = -1;
2581                         break;
2582                 }
2583                 if (self == 1 && peer == 0) {
2584                         rv =  1;
2585                         break;
2586                 }
2587                 /* Else fall through to one of the other strategies... */
2588         case ASB_DISCARD_OLDER_PRI:
2589                 if (self == 0 && peer == 1) {
2590                         rv = 1;
2591                         break;
2592                 }
2593                 if (self == 1 && peer == 0) {
2594                         rv = -1;
2595                         break;
2596                 }
2597                 /* Else fall through to one of the other strategies... */
2598                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2599                      "Using discard-least-changes instead\n");
2600         case ASB_DISCARD_ZERO_CHG:
2601                 if (ch_peer == 0 && ch_self == 0) {
2602                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2603                                 ? -1 : 1;
2604                         break;
2605                 } else {
2606                         if (ch_peer == 0) { rv =  1; break; }
2607                         if (ch_self == 0) { rv = -1; break; }
2608                 }
2609                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2610                         break;
2611         case ASB_DISCARD_LEAST_CHG:
2612                 if      (ch_self < ch_peer)
2613                         rv = -1;
2614                 else if (ch_self > ch_peer)
2615                         rv =  1;
2616                 else /* ( ch_self == ch_peer ) */
2617                      /* Well, then use something else. */
2618                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2619                                 ? -1 : 1;
2620                 break;
2621         case ASB_DISCARD_LOCAL:
2622                 rv = -1;
2623                 break;
2624         case ASB_DISCARD_REMOTE:
2625                 rv =  1;
2626         }
2627
2628         return rv;
2629 }
2630
2631 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2632 {
2633         int hg, rv = -100;
2634         enum drbd_after_sb_p after_sb_1p;
2635
2636         rcu_read_lock();
2637         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2638         rcu_read_unlock();
2639         switch (after_sb_1p) {
2640         case ASB_DISCARD_YOUNGER_PRI:
2641         case ASB_DISCARD_OLDER_PRI:
2642         case ASB_DISCARD_LEAST_CHG:
2643         case ASB_DISCARD_LOCAL:
2644         case ASB_DISCARD_REMOTE:
2645         case ASB_DISCARD_ZERO_CHG:
2646                 dev_err(DEV, "Configuration error.\n");
2647                 break;
2648         case ASB_DISCONNECT:
2649                 break;
2650         case ASB_CONSENSUS:
2651                 hg = drbd_asb_recover_0p(mdev);
2652                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2653                         rv = hg;
2654                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2655                         rv = hg;
2656                 break;
2657         case ASB_VIOLENTLY:
2658                 rv = drbd_asb_recover_0p(mdev);
2659                 break;
2660         case ASB_DISCARD_SECONDARY:
2661                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2662         case ASB_CALL_HELPER:
2663                 hg = drbd_asb_recover_0p(mdev);
2664                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2665                         enum drbd_state_rv rv2;
2666
2667                         drbd_set_role(mdev, R_SECONDARY, 0);
2668                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2669                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2670                           * we do not need to wait for the after state change work either. */
2671                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2672                         if (rv2 != SS_SUCCESS) {
2673                                 drbd_khelper(mdev, "pri-lost-after-sb");
2674                         } else {
2675                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2676                                 rv = hg;
2677                         }
2678                 } else
2679                         rv = hg;
2680         }
2681
2682         return rv;
2683 }
2684
2685 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2686 {
2687         int hg, rv = -100;
2688         enum drbd_after_sb_p after_sb_2p;
2689
2690         rcu_read_lock();
2691         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2692         rcu_read_unlock();
2693         switch (after_sb_2p) {
2694         case ASB_DISCARD_YOUNGER_PRI:
2695         case ASB_DISCARD_OLDER_PRI:
2696         case ASB_DISCARD_LEAST_CHG:
2697         case ASB_DISCARD_LOCAL:
2698         case ASB_DISCARD_REMOTE:
2699         case ASB_CONSENSUS:
2700         case ASB_DISCARD_SECONDARY:
2701         case ASB_DISCARD_ZERO_CHG:
2702                 dev_err(DEV, "Configuration error.\n");
2703                 break;
2704         case ASB_VIOLENTLY:
2705                 rv = drbd_asb_recover_0p(mdev);
2706                 break;
2707         case ASB_DISCONNECT:
2708                 break;
2709         case ASB_CALL_HELPER:
2710                 hg = drbd_asb_recover_0p(mdev);
2711                 if (hg == -1) {
2712                         enum drbd_state_rv rv2;
2713
2714                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2715                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2716                           * we do not need to wait for the after state change work either. */
2717                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2718                         if (rv2 != SS_SUCCESS) {
2719                                 drbd_khelper(mdev, "pri-lost-after-sb");
2720                         } else {
2721                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2722                                 rv = hg;
2723                         }
2724                 } else
2725                         rv = hg;
2726         }
2727
2728         return rv;
2729 }
2730
2731 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2732                            u64 bits, u64 flags)
2733 {
2734         if (!uuid) {
2735                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2736                 return;
2737         }
2738         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2739              text,
2740              (unsigned long long)uuid[UI_CURRENT],
2741              (unsigned long long)uuid[UI_BITMAP],
2742              (unsigned long long)uuid[UI_HISTORY_START],
2743              (unsigned long long)uuid[UI_HISTORY_END],
2744              (unsigned long long)bits,
2745              (unsigned long long)flags);
2746 }
2747
2748 /*
2749   100   after split brain try auto recover
2750     2   C_SYNC_SOURCE set BitMap
2751     1   C_SYNC_SOURCE use BitMap
2752     0   no Sync
2753    -1   C_SYNC_TARGET use BitMap
2754    -2   C_SYNC_TARGET set BitMap
2755  -100   after split brain, disconnect
2756 -1000   unrelated data
2757 -1091   requires proto 91
2758 -1096   requires proto 96
2759  */
2760 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2761 {
2762         u64 self, peer;
2763         int i, j;
2764
2765         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2766         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2767
2768         *rule_nr = 10;
2769         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2770                 return 0;
2771
2772         *rule_nr = 20;
2773         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2774              peer != UUID_JUST_CREATED)
2775                 return -2;
2776
2777         *rule_nr = 30;
2778         if (self != UUID_JUST_CREATED &&
2779             (peer == UUID_JUST_CREATED || peer == (u64)0))
2780                 return 2;
2781
2782         if (self == peer) {
2783                 int rct, dc; /* roles at crash time */
2784
2785                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2786
2787                         if (mdev->tconn->agreed_pro_version < 91)
2788                                 return -1091;
2789
2790                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2791                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2792                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2793                                 drbd_uuid_move_history(mdev);
2794                                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2795                                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2796
2797                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2798                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2799                                 *rule_nr = 34;
2800                         } else {
2801                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2802                                 *rule_nr = 36;
2803                         }
2804
2805                         return 1;
2806                 }
2807
2808                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2809
2810                         if (mdev->tconn->agreed_pro_version < 91)
2811                                 return -1091;
2812
2813                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2814                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2815                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2816
2817                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2818                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2819                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2820
2821                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2822                                 *rule_nr = 35;
2823                         } else {
2824                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2825                                 *rule_nr = 37;
2826                         }
2827
2828                         return -1;
2829                 }
2830
2831                 /* Common power [off|failure] */
2832                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2833                         (mdev->p_uuid[UI_FLAGS] & 2);
2834                 /* lowest bit is set when we were primary,
2835                  * next bit (weight 2) is set when peer was primary */
2836                 *rule_nr = 40;
2837
2838                 switch (rct) {
2839                 case 0: /* !self_pri && !peer_pri */ return 0;
2840                 case 1: /*  self_pri && !peer_pri */ return 1;
2841                 case 2: /* !self_pri &&  peer_pri */ return -1;
2842                 case 3: /*  self_pri &&  peer_pri */
2843                         dc = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags);
2844                         return dc ? -1 : 1;
2845                 }
2846         }
2847
2848         *rule_nr = 50;
2849         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2850         if (self == peer)
2851                 return -1;
2852
2853         *rule_nr = 51;
2854         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2855         if (self == peer) {
2856                 if (mdev->tconn->agreed_pro_version < 96 ?
2857                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2858                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2859                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2860                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2861                            resync as sync source modifications of the peer's UUIDs. */
2862
2863                         if (mdev->tconn->agreed_pro_version < 91)
2864                                 return -1091;
2865
2866                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2867                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2868
2869                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2870                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2871
2872                         return -1;
2873                 }
2874         }
2875
2876         *rule_nr = 60;
2877         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2878         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2879                 peer = mdev->p_uuid[i] & ~((u64)1);
2880                 if (self == peer)
2881                         return -2;
2882         }
2883
2884         *rule_nr = 70;
2885         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2886         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2887         if (self == peer)
2888                 return 1;
2889
2890         *rule_nr = 71;
2891         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2892         if (self == peer) {
2893                 if (mdev->tconn->agreed_pro_version < 96 ?
2894                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2895                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2896                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2897                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2898                            resync as sync source modifications of our UUIDs. */
2899
2900                         if (mdev->tconn->agreed_pro_version < 91)
2901                                 return -1091;
2902
2903                         __drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2904                         __drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2905
2906                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2907                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2908                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2909
2910                         return 1;
2911                 }
2912         }
2913
2914
2915         *rule_nr = 80;
2916         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2917         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2918                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2919                 if (self == peer)
2920                         return 2;
2921         }
2922
2923         *rule_nr = 90;
2924         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2925         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2926         if (self == peer && self != ((u64)0))
2927                 return 100;
2928
2929         *rule_nr = 100;
2930         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2931                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2932                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2933                         peer = mdev->p_uuid[j] & ~((u64)1);
2934                         if (self == peer)
2935                                 return -100;
2936                 }
2937         }
2938
2939         return -1000;
2940 }
2941
2942 /* drbd_sync_handshake() returns the new conn state on success, or
2943    CONN_MASK (-1) on failure.
2944  */
2945 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2946                                            enum drbd_disk_state peer_disk) __must_hold(local)
2947 {
2948         enum drbd_conns rv = C_MASK;
2949         enum drbd_disk_state mydisk;
2950         struct net_conf *nc;
2951         int hg, rule_nr, rr_conflict, tentative;
2952
2953         mydisk = mdev->state.disk;
2954         if (mydisk == D_NEGOTIATING)
2955                 mydisk = mdev->new_state_tmp.disk;
2956
2957         dev_info(DEV, "drbd_sync_handshake:\n");
2958
2959         spin_lock_irq(&mdev->ldev->md.uuid_lock);
2960         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2961         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2962                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2963
2964         hg = drbd_uuid_compare(mdev, &rule_nr);
2965         spin_unlock_irq(&mdev->ldev->md.uuid_lock);
2966
2967         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2968
2969         if (hg == -1000) {
2970                 dev_alert(DEV, "Unrelated data, aborting!\n");
2971                 return C_MASK;
2972         }
2973         if (hg < -1000) {
2974                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2975                 return C_MASK;
2976         }
2977
2978         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2979             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2980                 int f = (hg == -100) || abs(hg) == 2;
2981                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2982                 if (f)
2983                         hg = hg*2;
2984                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2985                      hg > 0 ? "source" : "target");
2986         }
2987
2988         if (abs(hg) == 100)
2989                 drbd_khelper(mdev, "initial-split-brain");
2990
2991         rcu_read_lock();
2992         nc = rcu_dereference(mdev->tconn->net_conf);
2993
2994         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2995                 int pcount = (mdev->state.role == R_PRIMARY)
2996                            + (peer_role == R_PRIMARY);
2997                 int forced = (hg == -100);
2998
2999                 switch (pcount) {
3000                 case 0:
3001                         hg = drbd_asb_recover_0p(mdev);
3002                         break;
3003                 case 1:
3004                         hg = drbd_asb_recover_1p(mdev);
3005                         break;
3006                 case 2:
3007                         hg = drbd_asb_recover_2p(mdev);
3008                         break;
3009                 }
3010                 if (abs(hg) < 100) {
3011                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
3012                              "automatically solved. Sync from %s node\n",
3013                              pcount, (hg < 0) ? "peer" : "this");
3014                         if (forced) {
3015                                 dev_warn(DEV, "Doing a full sync, since"
3016                                      " UUIDs where ambiguous.\n");
3017                                 hg = hg*2;
3018                         }
3019                 }
3020         }
3021
3022         if (hg == -100) {
3023                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3024                         hg = -1;
3025                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3026                         hg = 1;
3027
3028                 if (abs(hg) < 100)
3029                         dev_warn(DEV, "Split-Brain detected, manually solved. "
3030                              "Sync from %s node\n",
3031                              (hg < 0) ? "peer" : "this");
3032         }
3033         rr_conflict = nc->rr_conflict;
3034         tentative = nc->tentative;
3035         rcu_read_unlock();
3036
3037         if (hg == -100) {
3038                 /* FIXME this log message is not correct if we end up here
3039                  * after an attempted attach on a diskless node.
3040                  * We just refuse to attach -- well, we drop the "connection"
3041                  * to that disk, in a way... */
3042                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3043                 drbd_khelper(mdev, "split-brain");
3044                 return C_MASK;
3045         }
3046
3047         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3048                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3049                 return C_MASK;
3050         }
3051
3052         if (hg < 0 && /* by intention we do not use mydisk here. */
3053             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3054                 switch (rr_conflict) {
3055                 case ASB_CALL_HELPER:
3056                         drbd_khelper(mdev, "pri-lost");
3057                         /* fall through */
3058                 case ASB_DISCONNECT:
3059                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3060                         return C_MASK;
3061                 case ASB_VIOLENTLY:
3062                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3063                              "assumption\n");
3064                 }
3065         }
3066
3067         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3068                 if (hg == 0)
3069                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3070                 else
3071                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3072                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3073                                  abs(hg) >= 2 ? "full" : "bit-map based");
3074                 return C_MASK;
3075         }
3076
3077         if (abs(hg) >= 2) {
3078                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3079                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3080                                         BM_LOCKED_SET_ALLOWED))
3081                         return C_MASK;
3082         }
3083
3084         if (hg > 0) { /* become sync source. */
3085                 rv = C_WF_BITMAP_S;
3086         } else if (hg < 0) { /* become sync target */
3087                 rv = C_WF_BITMAP_T;
3088         } else {
3089                 rv = C_CONNECTED;
3090                 if (drbd_bm_total_weight(mdev)) {
3091                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3092                              drbd_bm_total_weight(mdev));
3093                 }
3094         }
3095
3096         return rv;
3097 }
3098
3099 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3100 {
3101         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3102         if (peer == ASB_DISCARD_REMOTE)
3103                 return ASB_DISCARD_LOCAL;
3104
3105         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3106         if (peer == ASB_DISCARD_LOCAL)
3107                 return ASB_DISCARD_REMOTE;
3108
3109         /* everything else is valid if they are equal on both sides. */
3110         return peer;
3111 }
3112
3113 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3114 {
3115         struct p_protocol *p = pi->data;
3116         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3117         int p_proto, p_discard_my_data, p_two_primaries, cf;
3118         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3119         char integrity_alg[SHARED_SECRET_MAX] = "";
3120         struct crypto_hash *peer_integrity_tfm = NULL;
3121         void *int_dig_in = NULL, *int_dig_vv = NULL;
3122
3123         p_proto         = be32_to_cpu(p->protocol);
3124         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3125         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3126         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3127         p_two_primaries = be32_to_cpu(p->two_primaries);
3128         cf              = be32_to_cpu(p->conn_flags);
3129         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3130
3131         if (tconn->agreed_pro_version >= 87) {
3132                 int err;
3133
3134                 if (pi->size > sizeof(integrity_alg))
3135                         return -EIO;
3136                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3137                 if (err)
3138                         return err;
3139                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3140         }
3141
3142         if (pi->cmd != P_PROTOCOL_UPDATE) {
3143                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3144
3145                 if (cf & CF_DRY_RUN)
3146                         set_bit(CONN_DRY_RUN, &tconn->flags);
3147
3148                 rcu_read_lock();
3149                 nc = rcu_dereference(tconn->net_conf);
3150
3151                 if (p_proto != nc->wire_protocol) {
3152                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3153                         goto disconnect_rcu_unlock;
3154                 }
3155
3156                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3157                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3158                         goto disconnect_rcu_unlock;
3159                 }
3160
3161                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3162                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3163                         goto disconnect_rcu_unlock;
3164                 }
3165
3166                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3167                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3168                         goto disconnect_rcu_unlock;
3169                 }
3170
3171                 if (p_discard_my_data && nc->discard_my_data) {
3172                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3173                         goto disconnect_rcu_unlock;
3174                 }
3175
3176                 if (p_two_primaries != nc->two_primaries) {
3177                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3178                         goto disconnect_rcu_unlock;
3179                 }
3180
3181                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3182                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3183                         goto disconnect_rcu_unlock;
3184                 }
3185
3186                 rcu_read_unlock();
3187         }
3188
3189         if (integrity_alg[0]) {
3190                 int hash_size;
3191
3192                 /*
3193                  * We can only change the peer data integrity algorithm
3194                  * here.  Changing our own data integrity algorithm
3195                  * requires that we send a P_PROTOCOL_UPDATE packet at
3196                  * the same time; otherwise, the peer has no way to
3197                  * tell between which packets the algorithm should
3198                  * change.
3199                  */
3200
3201                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3202                 if (!peer_integrity_tfm) {
3203                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3204                                  integrity_alg);
3205                         goto disconnect;
3206                 }
3207
3208                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3209                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3210                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3211                 if (!(int_dig_in && int_dig_vv)) {
3212                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3213                         goto disconnect;
3214                 }
3215         }
3216
3217         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3218         if (!new_net_conf) {
3219                 conn_err(tconn, "Allocation of new net_conf failed\n");
3220                 goto disconnect;
3221         }
3222
3223         mutex_lock(&tconn->data.mutex);
3224         mutex_lock(&tconn->conf_update);
3225         old_net_conf = tconn->net_conf;
3226         *new_net_conf = *old_net_conf;
3227
3228         new_net_conf->wire_protocol = p_proto;
3229         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3230         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3231         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3232         new_net_conf->two_primaries = p_two_primaries;
3233
3234         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3235         mutex_unlock(&tconn->conf_update);
3236         mutex_unlock(&tconn->data.mutex);
3237
3238         crypto_free_hash(tconn->peer_integrity_tfm);
3239         kfree(tconn->int_dig_in);
3240         kfree(tconn->int_dig_vv);
3241         tconn->peer_integrity_tfm = peer_integrity_tfm;
3242         tconn->int_dig_in = int_dig_in;
3243         tconn->int_dig_vv = int_dig_vv;
3244
3245         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3246                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3247                           integrity_alg[0] ? integrity_alg : "(none)");
3248
3249         synchronize_rcu();
3250         kfree(old_net_conf);
3251         return 0;
3252
3253 disconnect_rcu_unlock:
3254         rcu_read_unlock();
3255 disconnect:
3256         crypto_free_hash(peer_integrity_tfm);
3257         kfree(int_dig_in);
3258         kfree(int_dig_vv);
3259         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3260         return -EIO;
3261 }
3262
3263 /* helper function
3264  * input: alg name, feature name
3265  * return: NULL (alg name was "")
3266  *         ERR_PTR(error) if something goes wrong
3267  *         or the crypto hash ptr, if it worked out ok. */
3268 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3269                 const char *alg, const char *name)
3270 {
3271         struct crypto_hash *tfm;
3272
3273         if (!alg[0])
3274                 return NULL;
3275
3276         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3277         if (IS_ERR(tfm)) {
3278                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3279                         alg, name, PTR_ERR(tfm));
3280                 return tfm;
3281         }
3282         return tfm;
3283 }
3284
3285 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3286 {
3287         void *buffer = tconn->data.rbuf;
3288         int size = pi->size;
3289
3290         while (size) {
3291                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3292                 s = drbd_recv(tconn, buffer, s);
3293                 if (s <= 0) {
3294                         if (s < 0)
3295                                 return s;
3296                         break;
3297                 }
3298                 size -= s;
3299         }
3300         if (size)
3301                 return -EIO;
3302         return 0;
3303 }
3304
3305 /*
3306  * config_unknown_volume  -  device configuration command for unknown volume
3307  *
3308  * When a device is added to an existing connection, the node on which the
3309  * device is added first will send configuration commands to its peer but the
3310  * peer will not know about the device yet.  It will warn and ignore these
3311  * commands.  Once the device is added on the second node, the second node will
3312  * send the same device configuration commands, but in the other direction.
3313  *
3314  * (We can also end up here if drbd is misconfigured.)
3315  */
3316 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3317 {
3318         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3319                   cmdname(pi->cmd), pi->vnr);
3320         return ignore_remaining_packet(tconn, pi);
3321 }
3322
3323 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3324 {
3325         struct drbd_conf *mdev;
3326         struct p_rs_param_95 *p;
3327         unsigned int header_size, data_size, exp_max_sz;
3328         struct crypto_hash *verify_tfm = NULL;
3329         struct crypto_hash *csums_tfm = NULL;
3330         struct net_conf *old_net_conf, *new_net_conf = NULL;
3331         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3332         const int apv = tconn->agreed_pro_version;
3333         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3334         int fifo_size = 0;
3335         int err;
3336
3337         mdev = vnr_to_mdev(tconn, pi->vnr);
3338         if (!mdev)
3339                 return config_unknown_volume(tconn, pi);
3340
3341         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3342                     : apv == 88 ? sizeof(struct p_rs_param)
3343                                         + SHARED_SECRET_MAX
3344                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3345                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3346
3347         if (pi->size > exp_max_sz) {
3348                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3349                     pi->size, exp_max_sz);
3350                 return -EIO;
3351         }
3352
3353         if (apv <= 88) {
3354                 header_size = sizeof(struct p_rs_param);
3355                 data_size = pi->size - header_size;
3356         } else if (apv <= 94) {
3357                 header_size = sizeof(struct p_rs_param_89);
3358                 data_size = pi->size - header_size;
3359                 D_ASSERT(data_size == 0);
3360         } else {
3361                 header_size = sizeof(struct p_rs_param_95);
3362                 data_size = pi->size - header_size;
3363                 D_ASSERT(data_size == 0);
3364         }
3365
3366         /* initialize verify_alg and csums_alg */
3367         p = pi->data;
3368         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3369
3370         err = drbd_recv_all(mdev->tconn, p, header_size);
3371         if (err)
3372                 return err;
3373
3374         mutex_lock(&mdev->tconn->conf_update);
3375         old_net_conf = mdev->tconn->net_conf;
3376         if (get_ldev(mdev)) {
3377                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3378                 if (!new_disk_conf) {
3379                         put_ldev(mdev);
3380                         mutex_unlock(&mdev->tconn->conf_update);
3381                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3382                         return -ENOMEM;
3383                 }
3384
3385                 old_disk_conf = mdev->ldev->disk_conf;
3386                 *new_disk_conf = *old_disk_conf;
3387
3388                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3389         }
3390
3391         if (apv >= 88) {
3392                 if (apv == 88) {
3393                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3394                                 dev_err(DEV, "verify-alg of wrong size, "
3395                                         "peer wants %u, accepting only up to %u byte\n",
3396                                         data_size, SHARED_SECRET_MAX);
3397                                 err = -EIO;
3398                                 goto reconnect;
3399                         }
3400
3401                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3402                         if (err)
3403                                 goto reconnect;
3404                         /* we expect NUL terminated string */
3405                         /* but just in case someone tries to be evil */
3406                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3407                         p->verify_alg[data_size-1] = 0;
3408
3409                 } else /* apv >= 89 */ {
3410                         /* we still expect NUL terminated strings */
3411                         /* but just in case someone tries to be evil */
3412                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3413                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3414                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3415                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3416                 }
3417
3418                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3419                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3420                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3421                                     old_net_conf->verify_alg, p->verify_alg);
3422                                 goto disconnect;
3423                         }
3424                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3425                                         p->verify_alg, "verify-alg");
3426                         if (IS_ERR(verify_tfm)) {
3427                                 verify_tfm = NULL;
3428                                 goto disconnect;
3429                         }
3430                 }
3431
3432                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3433                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3434                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3435                                     old_net_conf->csums_alg, p->csums_alg);
3436                                 goto disconnect;
3437                         }
3438                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3439                                         p->csums_alg, "csums-alg");
3440                         if (IS_ERR(csums_tfm)) {
3441                                 csums_tfm = NULL;
3442                                 goto disconnect;
3443                         }
3444                 }
3445
3446                 if (apv > 94 && new_disk_conf) {
3447                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3448                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3449                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3450                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3451
3452                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3453                         if (fifo_size != mdev->rs_plan_s->size) {
3454                                 new_plan = fifo_alloc(fifo_size);
3455                                 if (!new_plan) {
3456                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3457                                         put_ldev(mdev);
3458                                         goto disconnect;
3459                                 }
3460                         }
3461                 }
3462
3463                 if (verify_tfm || csums_tfm) {
3464                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3465                         if (!new_net_conf) {
3466                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3467                                 goto disconnect;
3468                         }
3469
3470                         *new_net_conf = *old_net_conf;
3471
3472                         if (verify_tfm) {
3473                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3474                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3475                                 crypto_free_hash(mdev->tconn->verify_tfm);
3476                                 mdev->tconn->verify_tfm = verify_tfm;
3477                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3478                         }
3479                         if (csums_tfm) {
3480                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3481                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3482                                 crypto_free_hash(mdev->tconn->csums_tfm);
3483                                 mdev->tconn->csums_tfm = csums_tfm;
3484                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3485                         }
3486                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3487                 }
3488         }
3489
3490         if (new_disk_conf) {
3491                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3492                 put_ldev(mdev);
3493         }
3494
3495         if (new_plan) {
3496                 old_plan = mdev->rs_plan_s;
3497                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3498         }
3499
3500         mutex_unlock(&mdev->tconn->conf_update);
3501         synchronize_rcu();
3502         if (new_net_conf)
3503                 kfree(old_net_conf);
3504         kfree(old_disk_conf);
3505         kfree(old_plan);
3506
3507         return 0;
3508
3509 reconnect:
3510         if (new_disk_conf) {
3511                 put_ldev(mdev);
3512                 kfree(new_disk_conf);
3513         }
3514         mutex_unlock(&mdev->tconn->conf_update);
3515         return -EIO;
3516
3517 disconnect:
3518         kfree(new_plan);
3519         if (new_disk_conf) {
3520                 put_ldev(mdev);
3521                 kfree(new_disk_conf);
3522         }
3523         mutex_unlock(&mdev->tconn->conf_update);
3524         /* just for completeness: actually not needed,
3525          * as this is not reached if csums_tfm was ok. */
3526         crypto_free_hash(csums_tfm);
3527         /* but free the verify_tfm again, if csums_tfm did not work out */
3528         crypto_free_hash(verify_tfm);
3529         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3530         return -EIO;
3531 }
3532
3533 /* warn if the arguments differ by more than 12.5% */
3534 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3535         const char *s, sector_t a, sector_t b)
3536 {
3537         sector_t d;
3538         if (a == 0 || b == 0)
3539                 return;
3540         d = (a > b) ? (a - b) : (b - a);
3541         if (d > (a>>3) || d > (b>>3))
3542                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3543                      (unsigned long long)a, (unsigned long long)b);
3544 }
3545
3546 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3547 {
3548         struct drbd_conf *mdev;
3549         struct p_sizes *p = pi->data;
3550         enum determine_dev_size dd = unchanged;
3551         sector_t p_size, p_usize, my_usize;
3552         int ldsc = 0; /* local disk size changed */
3553         enum dds_flags ddsf;
3554
3555         mdev = vnr_to_mdev(tconn, pi->vnr);
3556         if (!mdev)
3557                 return config_unknown_volume(tconn, pi);
3558
3559         p_size = be64_to_cpu(p->d_size);
3560         p_usize = be64_to_cpu(p->u_size);
3561
3562         /* just store the peer's disk size for now.
3563          * we still need to figure out whether we accept that. */
3564         mdev->p_size = p_size;
3565
3566         if (get_ldev(mdev)) {
3567                 rcu_read_lock();
3568                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3569                 rcu_read_unlock();
3570
3571                 warn_if_differ_considerably(mdev, "lower level device sizes",
3572                            p_size, drbd_get_max_capacity(mdev->ldev));
3573                 warn_if_differ_considerably(mdev, "user requested size",
3574                                             p_usize, my_usize);
3575
3576                 /* if this is the first connect, or an otherwise expected
3577                  * param exchange, choose the minimum */
3578                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3579                         p_usize = min_not_zero(my_usize, p_usize);
3580
3581                 /* Never shrink a device with usable data during connect.
3582                    But allow online shrinking if we are connected. */
3583                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3584                     drbd_get_capacity(mdev->this_bdev) &&
3585                     mdev->state.disk >= D_OUTDATED &&
3586                     mdev->state.conn < C_CONNECTED) {
3587                         dev_err(DEV, "The peer's disk size is too small!\n");
3588                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3589                         put_ldev(mdev);
3590                         return -EIO;
3591                 }
3592
3593                 if (my_usize != p_usize) {
3594                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3595
3596                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3597                         if (!new_disk_conf) {
3598                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3599                                 put_ldev(mdev);
3600                                 return -ENOMEM;
3601                         }
3602
3603                         mutex_lock(&mdev->tconn->conf_update);
3604                         old_disk_conf = mdev->ldev->disk_conf;
3605                         *new_disk_conf = *old_disk_conf;
3606                         new_disk_conf->disk_size = p_usize;
3607
3608                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3609                         mutex_unlock(&mdev->tconn->conf_update);
3610                         synchronize_rcu();
3611                         kfree(old_disk_conf);
3612
3613                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3614                                  (unsigned long)my_usize);
3615                 }
3616
3617                 put_ldev(mdev);
3618         }
3619
3620         ddsf = be16_to_cpu(p->dds_flags);
3621         if (get_ldev(mdev)) {
3622                 dd = drbd_determine_dev_size(mdev, ddsf);
3623                 put_ldev(mdev);
3624                 if (dd == dev_size_error)
3625                         return -EIO;
3626                 drbd_md_sync(mdev);
3627         } else {
3628                 /* I am diskless, need to accept the peer's size. */
3629                 drbd_set_my_capacity(mdev, p_size);
3630         }
3631
3632         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3633         drbd_reconsider_max_bio_size(mdev);
3634
3635         if (get_ldev(mdev)) {
3636                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3637                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3638                         ldsc = 1;
3639                 }
3640
3641                 put_ldev(mdev);
3642         }
3643
3644         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3645                 if (be64_to_cpu(p->c_size) !=
3646                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3647                         /* we have different sizes, probably peer
3648                          * needs to know my new size... */
3649                         drbd_send_sizes(mdev, 0, ddsf);
3650                 }
3651                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3652                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3653                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3654                             mdev->state.disk >= D_INCONSISTENT) {
3655                                 if (ddsf & DDSF_NO_RESYNC)
3656                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3657                                 else
3658                                         resync_after_online_grow(mdev);
3659                         } else
3660                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3661                 }
3662         }
3663
3664         return 0;
3665 }
3666
3667 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3668 {
3669         struct drbd_conf *mdev;
3670         struct p_uuids *p = pi->data;
3671         u64 *p_uuid;
3672         int i, updated_uuids = 0;
3673
3674         mdev = vnr_to_mdev(tconn, pi->vnr);
3675         if (!mdev)
3676                 return config_unknown_volume(tconn, pi);
3677
3678         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3679
3680         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3681                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3682
3683         kfree(mdev->p_uuid);
3684         mdev->p_uuid = p_uuid;
3685
3686         if (mdev->state.conn < C_CONNECTED &&
3687             mdev->state.disk < D_INCONSISTENT &&
3688             mdev->state.role == R_PRIMARY &&
3689             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3690                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3691                     (unsigned long long)mdev->ed_uuid);
3692                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3693                 return -EIO;
3694         }
3695
3696         if (get_ldev(mdev)) {
3697                 int skip_initial_sync =
3698                         mdev->state.conn == C_CONNECTED &&
3699                         mdev->tconn->agreed_pro_version >= 90 &&
3700                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3701                         (p_uuid[UI_FLAGS] & 8);
3702                 if (skip_initial_sync) {
3703                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3704                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3705                                         "clear_n_write from receive_uuids",
3706                                         BM_LOCKED_TEST_ALLOWED);
3707                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3708                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3709                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3710                                         CS_VERBOSE, NULL);
3711                         drbd_md_sync(mdev);
3712                         updated_uuids = 1;
3713                 }
3714                 put_ldev(mdev);
3715         } else if (mdev->state.disk < D_INCONSISTENT &&
3716                    mdev->state.role == R_PRIMARY) {
3717                 /* I am a diskless primary, the peer just created a new current UUID
3718                    for me. */
3719                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3720         }
3721
3722         /* Before we test for the disk state, we should wait until an eventually
3723            ongoing cluster wide state change is finished. That is important if
3724            we are primary and are detaching from our disk. We need to see the
3725            new disk state... */
3726         mutex_lock(mdev->state_mutex);
3727         mutex_unlock(mdev->state_mutex);
3728         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3729                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3730
3731         if (updated_uuids)
3732                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3733
3734         return 0;
3735 }
3736
3737 /**
3738  * convert_state() - Converts the peer's view of the cluster state to our point of view
3739  * @ps:         The state as seen by the peer.
3740  */
3741 static union drbd_state convert_state(union drbd_state ps)
3742 {
3743         union drbd_state ms;
3744
3745         static enum drbd_conns c_tab[] = {
3746                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3747                 [C_CONNECTED] = C_CONNECTED,
3748
3749                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3750                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3751                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3752                 [C_VERIFY_S]       = C_VERIFY_T,
3753                 [C_MASK]   = C_MASK,
3754         };
3755
3756         ms.i = ps.i;
3757
3758         ms.conn = c_tab[ps.conn];
3759         ms.peer = ps.role;
3760         ms.role = ps.peer;
3761         ms.pdsk = ps.disk;
3762         ms.disk = ps.pdsk;
3763         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3764
3765         return ms;
3766 }
3767
3768 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3769 {
3770         struct drbd_conf *mdev;
3771         struct p_req_state *p = pi->data;
3772         union drbd_state mask, val;
3773         enum drbd_state_rv rv;
3774
3775         mdev = vnr_to_mdev(tconn, pi->vnr);
3776         if (!mdev)
3777                 return -EIO;
3778
3779         mask.i = be32_to_cpu(p->mask);
3780         val.i = be32_to_cpu(p->val);
3781
3782         if (test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags) &&
3783             mutex_is_locked(mdev->state_mutex)) {
3784                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3785                 return 0;
3786         }
3787
3788         mask = convert_state(mask);
3789         val = convert_state(val);
3790
3791         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3792         drbd_send_sr_reply(mdev, rv);
3793
3794         drbd_md_sync(mdev);
3795
3796         return 0;
3797 }
3798
3799 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3800 {
3801         struct p_req_state *p = pi->data;
3802         union drbd_state mask, val;
3803         enum drbd_state_rv rv;
3804
3805         mask.i = be32_to_cpu(p->mask);
3806         val.i = be32_to_cpu(p->val);
3807
3808         if (test_bit(RESOLVE_CONFLICTS, &tconn->flags) &&
3809             mutex_is_locked(&tconn->cstate_mutex)) {
3810                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3811                 return 0;
3812         }
3813
3814         mask = convert_state(mask);
3815         val = convert_state(val);
3816
3817         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3818         conn_send_sr_reply(tconn, rv);
3819
3820         return 0;
3821 }
3822
3823 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3824 {
3825         struct drbd_conf *mdev;
3826         struct p_state *p = pi->data;
3827         union drbd_state os, ns, peer_state;
3828         enum drbd_disk_state real_peer_disk;
3829         enum chg_state_flags cs_flags;
3830         int rv;
3831
3832         mdev = vnr_to_mdev(tconn, pi->vnr);
3833         if (!mdev)
3834                 return config_unknown_volume(tconn, pi);
3835
3836         peer_state.i = be32_to_cpu(p->state);
3837
3838         real_peer_disk = peer_state.disk;
3839         if (peer_state.disk == D_NEGOTIATING) {
3840                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3841                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3842         }
3843
3844         spin_lock_irq(&mdev->tconn->req_lock);
3845  retry:
3846         os = ns = drbd_read_state(mdev);
3847         spin_unlock_irq(&mdev->tconn->req_lock);
3848
3849         /* If some other part of the code (asender thread, timeout)
3850          * already decided to close the connection again,
3851          * we must not "re-establish" it here. */
3852         if (os.conn <= C_TEAR_DOWN)
3853                 return -ECONNRESET;
3854
3855         /* If this is the "end of sync" confirmation, usually the peer disk
3856          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3857          * set) resync started in PausedSyncT, or if the timing of pause-/
3858          * unpause-sync events has been "just right", the peer disk may
3859          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3860          */
3861         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3862             real_peer_disk == D_UP_TO_DATE &&
3863             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3864                 /* If we are (becoming) SyncSource, but peer is still in sync
3865                  * preparation, ignore its uptodate-ness to avoid flapping, it
3866                  * will change to inconsistent once the peer reaches active
3867                  * syncing states.
3868                  * It may have changed syncer-paused flags, however, so we
3869                  * cannot ignore this completely. */
3870                 if (peer_state.conn > C_CONNECTED &&
3871                     peer_state.conn < C_SYNC_SOURCE)
3872                         real_peer_disk = D_INCONSISTENT;
3873
3874                 /* if peer_state changes to connected at the same time,
3875                  * it explicitly notifies us that it finished resync.
3876                  * Maybe we should finish it up, too? */
3877                 else if (os.conn >= C_SYNC_SOURCE &&
3878                          peer_state.conn == C_CONNECTED) {
3879                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3880                                 drbd_resync_finished(mdev);
3881                         return 0;
3882                 }
3883         }
3884
3885         /* explicit verify finished notification, stop sector reached. */
3886         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3887             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3888                 ov_out_of_sync_print(mdev);
3889                 drbd_resync_finished(mdev);
3890                 return 0;
3891         }
3892
3893         /* peer says his disk is inconsistent, while we think it is uptodate,
3894          * and this happens while the peer still thinks we have a sync going on,
3895          * but we think we are already done with the sync.
3896          * We ignore this to avoid flapping pdsk.
3897          * This should not happen, if the peer is a recent version of drbd. */
3898         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3899             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3900                 real_peer_disk = D_UP_TO_DATE;
3901
3902         if (ns.conn == C_WF_REPORT_PARAMS)
3903                 ns.conn = C_CONNECTED;
3904
3905         if (peer_state.conn == C_AHEAD)
3906                 ns.conn = C_BEHIND;
3907
3908         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3909             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3910                 int cr; /* consider resync */
3911
3912                 /* if we established a new connection */
3913                 cr  = (os.conn < C_CONNECTED);
3914                 /* if we had an established connection
3915                  * and one of the nodes newly attaches a disk */
3916                 cr |= (os.conn == C_CONNECTED &&
3917                        (peer_state.disk == D_NEGOTIATING ||
3918                         os.disk == D_NEGOTIATING));
3919                 /* if we have both been inconsistent, and the peer has been
3920                  * forced to be UpToDate with --overwrite-data */
3921                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3922                 /* if we had been plain connected, and the admin requested to
3923                  * start a sync by "invalidate" or "invalidate-remote" */
3924                 cr |= (os.conn == C_CONNECTED &&
3925                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3926                                  peer_state.conn <= C_WF_BITMAP_T));
3927
3928                 if (cr)
3929                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3930
3931                 put_ldev(mdev);
3932                 if (ns.conn == C_MASK) {
3933                         ns.conn = C_CONNECTED;
3934                         if (mdev->state.disk == D_NEGOTIATING) {
3935                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3936                         } else if (peer_state.disk == D_NEGOTIATING) {
3937                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3938                                 peer_state.disk = D_DISKLESS;
3939                                 real_peer_disk = D_DISKLESS;
3940                         } else {
3941                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3942                                         return -EIO;
3943                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3944                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3945                                 return -EIO;
3946                         }
3947                 }
3948         }
3949
3950         spin_lock_irq(&mdev->tconn->req_lock);
3951         if (os.i != drbd_read_state(mdev).i)
3952                 goto retry;
3953         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3954         ns.peer = peer_state.role;
3955         ns.pdsk = real_peer_disk;
3956         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3957         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3958                 ns.disk = mdev->new_state_tmp.disk;
3959         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3960         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3961             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3962                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3963                    for temporal network outages! */
3964                 spin_unlock_irq(&mdev->tconn->req_lock);
3965                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3966                 tl_clear(mdev->tconn);
3967                 drbd_uuid_new_current(mdev);
3968                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3969                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3970                 return -EIO;
3971         }
3972         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3973         ns = drbd_read_state(mdev);
3974         spin_unlock_irq(&mdev->tconn->req_lock);
3975
3976         if (rv < SS_SUCCESS) {
3977                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3978                 return -EIO;
3979         }
3980
3981         if (os.conn > C_WF_REPORT_PARAMS) {
3982                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3983                     peer_state.disk != D_NEGOTIATING ) {
3984                         /* we want resync, peer has not yet decided to sync... */
3985                         /* Nowadays only used when forcing a node into primary role and
3986                            setting its disk to UpToDate with that */
3987                         drbd_send_uuids(mdev);
3988                         drbd_send_current_state(mdev);
3989                 }
3990         }
3991
3992         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3993
3994         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3995
3996         return 0;
3997 }
3998
3999 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
4000 {
4001         struct drbd_conf *mdev;
4002         struct p_rs_uuid *p = pi->data;
4003
4004         mdev = vnr_to_mdev(tconn, pi->vnr);
4005         if (!mdev)
4006                 return -EIO;
4007
4008         wait_event(mdev->misc_wait,
4009                    mdev->state.conn == C_WF_SYNC_UUID ||
4010                    mdev->state.conn == C_BEHIND ||
4011                    mdev->state.conn < C_CONNECTED ||
4012                    mdev->state.disk < D_NEGOTIATING);
4013
4014         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
4015
4016         /* Here the _drbd_uuid_ functions are right, current should
4017            _not_ be rotated into the history */
4018         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
4019                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
4020                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
4021
4022                 drbd_print_uuids(mdev, "updated sync uuid");
4023                 drbd_start_resync(mdev, C_SYNC_TARGET);
4024
4025                 put_ldev(mdev);
4026         } else
4027                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
4028
4029         return 0;
4030 }
4031
4032 /**
4033  * receive_bitmap_plain
4034  *
4035  * Return 0 when done, 1 when another iteration is needed, and a negative error
4036  * code upon failure.
4037  */
4038 static int
4039 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4040                      unsigned long *p, struct bm_xfer_ctx *c)
4041 {
4042         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4043                                  drbd_header_size(mdev->tconn);
4044         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4045                                        c->bm_words - c->word_offset);
4046         unsigned int want = num_words * sizeof(*p);
4047         int err;
4048
4049         if (want != size) {
4050                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4051                 return -EIO;
4052         }
4053         if (want == 0)
4054                 return 0;
4055         err = drbd_recv_all(mdev->tconn, p, want);
4056         if (err)
4057                 return err;
4058
4059         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4060
4061         c->word_offset += num_words;
4062         c->bit_offset = c->word_offset * BITS_PER_LONG;
4063         if (c->bit_offset > c->bm_bits)
4064                 c->bit_offset = c->bm_bits;
4065
4066         return 1;
4067 }
4068
4069 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4070 {
4071         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4072 }
4073
4074 static int dcbp_get_start(struct p_compressed_bm *p)
4075 {
4076         return (p->encoding & 0x80) != 0;
4077 }
4078
4079 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4080 {
4081         return (p->encoding >> 4) & 0x7;
4082 }
4083
4084 /**
4085  * recv_bm_rle_bits
4086  *
4087  * Return 0 when done, 1 when another iteration is needed, and a negative error
4088  * code upon failure.
4089  */
4090 static int
4091 recv_bm_rle_bits(struct drbd_conf *mdev,
4092                 struct p_compressed_bm *p,
4093                  struct bm_xfer_ctx *c,
4094                  unsigned int len)
4095 {
4096         struct bitstream bs;
4097         u64 look_ahead;
4098         u64 rl;
4099         u64 tmp;
4100         unsigned long s = c->bit_offset;
4101         unsigned long e;
4102         int toggle = dcbp_get_start(p);
4103         int have;
4104         int bits;
4105
4106         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4107
4108         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4109         if (bits < 0)
4110                 return -EIO;
4111
4112         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4113                 bits = vli_decode_bits(&rl, look_ahead);
4114                 if (bits <= 0)
4115                         return -EIO;
4116
4117                 if (toggle) {
4118                         e = s + rl -1;
4119                         if (e >= c->bm_bits) {
4120                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4121                                 return -EIO;
4122                         }
4123                         _drbd_bm_set_bits(mdev, s, e);
4124                 }
4125
4126                 if (have < bits) {
4127                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4128                                 have, bits, look_ahead,
4129                                 (unsigned int)(bs.cur.b - p->code),
4130                                 (unsigned int)bs.buf_len);
4131                         return -EIO;
4132                 }
4133                 look_ahead >>= bits;
4134                 have -= bits;
4135
4136                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4137                 if (bits < 0)
4138                         return -EIO;
4139                 look_ahead |= tmp << have;
4140                 have += bits;
4141         }
4142
4143         c->bit_offset = s;
4144         bm_xfer_ctx_bit_to_word_offset(c);
4145
4146         return (s != c->bm_bits);
4147 }
4148
4149 /**
4150  * decode_bitmap_c
4151  *
4152  * Return 0 when done, 1 when another iteration is needed, and a negative error
4153  * code upon failure.
4154  */
4155 static int
4156 decode_bitmap_c(struct drbd_conf *mdev,
4157                 struct p_compressed_bm *p,
4158                 struct bm_xfer_ctx *c,
4159                 unsigned int len)
4160 {
4161         if (dcbp_get_code(p) == RLE_VLI_Bits)
4162                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4163
4164         /* other variants had been implemented for evaluation,
4165          * but have been dropped as this one turned out to be "best"
4166          * during all our tests. */
4167
4168         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4169         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4170         return -EIO;
4171 }
4172
4173 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4174                 const char *direction, struct bm_xfer_ctx *c)
4175 {
4176         /* what would it take to transfer it "plaintext" */
4177         unsigned int header_size = drbd_header_size(mdev->tconn);
4178         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4179         unsigned int plain =
4180                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4181                 c->bm_words * sizeof(unsigned long);
4182         unsigned int total = c->bytes[0] + c->bytes[1];
4183         unsigned int r;
4184
4185         /* total can not be zero. but just in case: */
4186         if (total == 0)
4187                 return;
4188
4189         /* don't report if not compressed */
4190         if (total >= plain)
4191                 return;
4192
4193         /* total < plain. check for overflow, still */
4194         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4195                                     : (1000 * total / plain);
4196
4197         if (r > 1000)
4198                 r = 1000;
4199
4200         r = 1000 - r;
4201         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4202              "total %u; compression: %u.%u%%\n",
4203                         direction,
4204                         c->bytes[1], c->packets[1],
4205                         c->bytes[0], c->packets[0],
4206                         total, r/10, r % 10);
4207 }
4208
4209 /* Since we are processing the bitfield from lower addresses to higher,
4210    it does not matter if the process it in 32 bit chunks or 64 bit
4211    chunks as long as it is little endian. (Understand it as byte stream,
4212    beginning with the lowest byte...) If we would use big endian
4213    we would need to process it from the highest address to the lowest,
4214    in order to be agnostic to the 32 vs 64 bits issue.
4215
4216    returns 0 on failure, 1 if we successfully received it. */
4217 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4218 {
4219         struct drbd_conf *mdev;
4220         struct bm_xfer_ctx c;
4221         int err;
4222
4223         mdev = vnr_to_mdev(tconn, pi->vnr);
4224         if (!mdev)
4225                 return -EIO;
4226
4227         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4228         /* you are supposed to send additional out-of-sync information
4229          * if you actually set bits during this phase */
4230
4231         c = (struct bm_xfer_ctx) {
4232                 .bm_bits = drbd_bm_bits(mdev),
4233                 .bm_words = drbd_bm_words(mdev),
4234         };
4235
4236         for(;;) {
4237                 if (pi->cmd == P_BITMAP)
4238                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4239                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4240                         /* MAYBE: sanity check that we speak proto >= 90,
4241                          * and the feature is enabled! */
4242                         struct p_compressed_bm *p = pi->data;
4243
4244                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4245                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4246                                 err = -EIO;
4247                                 goto out;
4248                         }
4249                         if (pi->size <= sizeof(*p)) {
4250                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4251                                 err = -EIO;
4252                                 goto out;
4253                         }
4254                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4255                         if (err)
4256                                goto out;
4257                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4258                 } else {
4259                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4260                         err = -EIO;
4261                         goto out;
4262                 }
4263
4264                 c.packets[pi->cmd == P_BITMAP]++;
4265                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4266
4267                 if (err <= 0) {
4268                         if (err < 0)
4269                                 goto out;
4270                         break;
4271                 }
4272                 err = drbd_recv_header(mdev->tconn, pi);
4273                 if (err)
4274                         goto out;
4275         }
4276
4277         INFO_bm_xfer_stats(mdev, "receive", &c);
4278
4279         if (mdev->state.conn == C_WF_BITMAP_T) {
4280                 enum drbd_state_rv rv;
4281
4282                 err = drbd_send_bitmap(mdev);
4283                 if (err)
4284                         goto out;
4285                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4286                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4287                 D_ASSERT(rv == SS_SUCCESS);
4288         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4289                 /* admin may have requested C_DISCONNECTING,
4290                  * other threads may have noticed network errors */
4291                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4292                     drbd_conn_str(mdev->state.conn));
4293         }
4294         err = 0;
4295
4296  out:
4297         drbd_bm_unlock(mdev);
4298         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4299                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4300         return err;
4301 }
4302
4303 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4304 {
4305         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4306                  pi->cmd, pi->size);
4307
4308         return ignore_remaining_packet(tconn, pi);
4309 }
4310
4311 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4312 {
4313         /* Make sure we've acked all the TCP data associated
4314          * with the data requests being unplugged */
4315         drbd_tcp_quickack(tconn->data.socket);
4316
4317         return 0;
4318 }
4319
4320 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4321 {
4322         struct drbd_conf *mdev;
4323         struct p_block_desc *p = pi->data;
4324
4325         mdev = vnr_to_mdev(tconn, pi->vnr);
4326         if (!mdev)
4327                 return -EIO;
4328
4329         switch (mdev->state.conn) {
4330         case C_WF_SYNC_UUID:
4331         case C_WF_BITMAP_T:
4332         case C_BEHIND:
4333                         break;
4334         default:
4335                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4336                                 drbd_conn_str(mdev->state.conn));
4337         }
4338
4339         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4340
4341         return 0;
4342 }
4343
4344 struct data_cmd {
4345         int expect_payload;
4346         size_t pkt_size;
4347         int (*fn)(struct drbd_tconn *, struct packet_info *);
4348 };
4349
4350 static struct data_cmd drbd_cmd_handler[] = {
4351         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4352         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4353         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4354         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4355         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4356         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4357         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4358         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4359         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4360         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4361         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4362         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4363         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4364         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4365         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4366         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4367         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4368         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4369         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4370         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4371         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4372         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4373         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4374         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4375 };
4376
4377 static void drbdd(struct drbd_tconn *tconn)
4378 {
4379         struct packet_info pi;
4380         size_t shs; /* sub header size */
4381         int err;
4382
4383         while (get_t_state(&tconn->receiver) == RUNNING) {
4384                 struct data_cmd *cmd;
4385
4386                 drbd_thread_current_set_cpu(&tconn->receiver);
4387                 if (drbd_recv_header(tconn, &pi))
4388                         goto err_out;
4389
4390                 cmd = &drbd_cmd_handler[pi.cmd];
4391                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4392                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4393                                  cmdname(pi.cmd), pi.cmd);
4394                         goto err_out;
4395                 }
4396
4397                 shs = cmd->pkt_size;
4398                 if (pi.size > shs && !cmd->expect_payload) {
4399                         conn_err(tconn, "No payload expected %s l:%d\n",
4400                                  cmdname(pi.cmd), pi.size);
4401                         goto err_out;
4402                 }
4403
4404                 if (shs) {
4405                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4406                         if (err)
4407                                 goto err_out;
4408                         pi.size -= shs;
4409                 }
4410
4411                 err = cmd->fn(tconn, &pi);
4412                 if (err) {
4413                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4414                                  cmdname(pi.cmd), err, pi.size);
4415                         goto err_out;
4416                 }
4417         }
4418         return;
4419
4420     err_out:
4421         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4422 }
4423
4424 void conn_flush_workqueue(struct drbd_tconn *tconn)
4425 {
4426         struct drbd_wq_barrier barr;
4427
4428         barr.w.cb = w_prev_work_done;
4429         barr.w.tconn = tconn;
4430         init_completion(&barr.done);
4431         drbd_queue_work(&tconn->sender_work, &barr.w);
4432         wait_for_completion(&barr.done);
4433 }
4434
4435 static void conn_disconnect(struct drbd_tconn *tconn)
4436 {
4437         struct drbd_conf *mdev;
4438         enum drbd_conns oc;
4439         int vnr;
4440
4441         if (tconn->cstate == C_STANDALONE)
4442                 return;
4443
4444         /* We are about to start the cleanup after connection loss.
4445          * Make sure drbd_make_request knows about that.
4446          * Usually we should be in some network failure state already,
4447          * but just in case we are not, we fix it up here.
4448          */
4449         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4450
4451         /* asender does not clean up anything. it must not interfere, either */
4452         drbd_thread_stop(&tconn->asender);
4453         drbd_free_sock(tconn);
4454
4455         rcu_read_lock();
4456         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4457                 kref_get(&mdev->kref);
4458                 rcu_read_unlock();
4459                 drbd_disconnected(mdev);
4460                 kref_put(&mdev->kref, &drbd_minor_destroy);
4461                 rcu_read_lock();
4462         }
4463         rcu_read_unlock();
4464
4465         if (!list_empty(&tconn->current_epoch->list))
4466                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4467         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4468         atomic_set(&tconn->current_epoch->epoch_size, 0);
4469         tconn->send.seen_any_write_yet = false;
4470
4471         conn_info(tconn, "Connection closed\n");
4472
4473         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4474                 conn_try_outdate_peer_async(tconn);
4475
4476         spin_lock_irq(&tconn->req_lock);
4477         oc = tconn->cstate;
4478         if (oc >= C_UNCONNECTED)
4479                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4480
4481         spin_unlock_irq(&tconn->req_lock);
4482
4483         if (oc == C_DISCONNECTING)
4484                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4485 }
4486
4487 static int drbd_disconnected(struct drbd_conf *mdev)
4488 {
4489         unsigned int i;
4490
4491         /* wait for current activity to cease. */
4492         spin_lock_irq(&mdev->tconn->req_lock);
4493         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4494         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4495         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4496         spin_unlock_irq(&mdev->tconn->req_lock);
4497
4498         /* We do not have data structures that would allow us to
4499          * get the rs_pending_cnt down to 0 again.
4500          *  * On C_SYNC_TARGET we do not have any data structures describing
4501          *    the pending RSDataRequest's we have sent.
4502          *  * On C_SYNC_SOURCE there is no data structure that tracks
4503          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4504          *  And no, it is not the sum of the reference counts in the
4505          *  resync_LRU. The resync_LRU tracks the whole operation including
4506          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4507          *  on the fly. */
4508         drbd_rs_cancel_all(mdev);
4509         mdev->rs_total = 0;
4510         mdev->rs_failed = 0;
4511         atomic_set(&mdev->rs_pending_cnt, 0);
4512         wake_up(&mdev->misc_wait);
4513
4514         del_timer_sync(&mdev->resync_timer);
4515         resync_timer_fn((unsigned long)mdev);
4516
4517         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4518          * w_make_resync_request etc. which may still be on the worker queue
4519          * to be "canceled" */
4520         drbd_flush_workqueue(mdev);
4521
4522         drbd_finish_peer_reqs(mdev);
4523
4524         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4525            might have issued a work again. The one before drbd_finish_peer_reqs() is
4526            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4527         drbd_flush_workqueue(mdev);
4528
4529         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4530          * again via drbd_try_clear_on_disk_bm(). */
4531         drbd_rs_cancel_all(mdev);
4532
4533         kfree(mdev->p_uuid);
4534         mdev->p_uuid = NULL;
4535
4536         if (!drbd_suspended(mdev))
4537                 tl_clear(mdev->tconn);
4538
4539         drbd_md_sync(mdev);
4540
4541         /* serialize with bitmap writeout triggered by the state change,
4542          * if any. */
4543         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4544
4545         /* tcp_close and release of sendpage pages can be deferred.  I don't
4546          * want to use SO_LINGER, because apparently it can be deferred for
4547          * more than 20 seconds (longest time I checked).
4548          *
4549          * Actually we don't care for exactly when the network stack does its
4550          * put_page(), but release our reference on these pages right here.
4551          */
4552         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4553         if (i)
4554                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4555         i = atomic_read(&mdev->pp_in_use_by_net);
4556         if (i)
4557                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4558         i = atomic_read(&mdev->pp_in_use);
4559         if (i)
4560                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4561
4562         D_ASSERT(list_empty(&mdev->read_ee));
4563         D_ASSERT(list_empty(&mdev->active_ee));
4564         D_ASSERT(list_empty(&mdev->sync_ee));
4565         D_ASSERT(list_empty(&mdev->done_ee));
4566
4567         return 0;
4568 }
4569
4570 /*
4571  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4572  * we can agree on is stored in agreed_pro_version.
4573  *
4574  * feature flags and the reserved array should be enough room for future
4575  * enhancements of the handshake protocol, and possible plugins...
4576  *
4577  * for now, they are expected to be zero, but ignored.
4578  */
4579 static int drbd_send_features(struct drbd_tconn *tconn)
4580 {
4581         struct drbd_socket *sock;
4582         struct p_connection_features *p;
4583
4584         sock = &tconn->data;
4585         p = conn_prepare_command(tconn, sock);
4586         if (!p)
4587                 return -EIO;
4588         memset(p, 0, sizeof(*p));
4589         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4590         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4591         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4592 }
4593
4594 /*
4595  * return values:
4596  *   1 yes, we have a valid connection
4597  *   0 oops, did not work out, please try again
4598  *  -1 peer talks different language,
4599  *     no point in trying again, please go standalone.
4600  */
4601 static int drbd_do_features(struct drbd_tconn *tconn)
4602 {
4603         /* ASSERT current == tconn->receiver ... */
4604         struct p_connection_features *p;
4605         const int expect = sizeof(struct p_connection_features);
4606         struct packet_info pi;
4607         int err;
4608
4609         err = drbd_send_features(tconn);
4610         if (err)
4611                 return 0;
4612
4613         err = drbd_recv_header(tconn, &pi);
4614         if (err)
4615                 return 0;
4616
4617         if (pi.cmd != P_CONNECTION_FEATURES) {
4618                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4619                          cmdname(pi.cmd), pi.cmd);
4620                 return -1;
4621         }
4622
4623         if (pi.size != expect) {
4624                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4625                      expect, pi.size);
4626                 return -1;
4627         }
4628
4629         p = pi.data;
4630         err = drbd_recv_all_warn(tconn, p, expect);
4631         if (err)
4632                 return 0;
4633
4634         p->protocol_min = be32_to_cpu(p->protocol_min);
4635         p->protocol_max = be32_to_cpu(p->protocol_max);
4636         if (p->protocol_max == 0)
4637                 p->protocol_max = p->protocol_min;
4638
4639         if (PRO_VERSION_MAX < p->protocol_min ||
4640             PRO_VERSION_MIN > p->protocol_max)
4641                 goto incompat;
4642
4643         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4644
4645         conn_info(tconn, "Handshake successful: "
4646              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4647
4648         return 1;
4649
4650  incompat:
4651         conn_err(tconn, "incompatible DRBD dialects: "
4652             "I support %d-%d, peer supports %d-%d\n",
4653             PRO_VERSION_MIN, PRO_VERSION_MAX,
4654             p->protocol_min, p->protocol_max);
4655         return -1;
4656 }
4657
4658 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4659 static int drbd_do_auth(struct drbd_tconn *tconn)
4660 {
4661         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4662         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4663         return -1;
4664 }
4665 #else
4666 #define CHALLENGE_LEN 64
4667
4668 /* Return value:
4669         1 - auth succeeded,
4670         0 - failed, try again (network error),
4671         -1 - auth failed, don't try again.
4672 */
4673
4674 static int drbd_do_auth(struct drbd_tconn *tconn)
4675 {
4676         struct drbd_socket *sock;
4677         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4678         struct scatterlist sg;
4679         char *response = NULL;
4680         char *right_response = NULL;
4681         char *peers_ch = NULL;
4682         unsigned int key_len;
4683         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4684         unsigned int resp_size;
4685         struct hash_desc desc;
4686         struct packet_info pi;
4687         struct net_conf *nc;
4688         int err, rv;
4689
4690         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4691
4692         rcu_read_lock();
4693         nc = rcu_dereference(tconn->net_conf);
4694         key_len = strlen(nc->shared_secret);
4695         memcpy(secret, nc->shared_secret, key_len);
4696         rcu_read_unlock();
4697
4698         desc.tfm = tconn->cram_hmac_tfm;
4699         desc.flags = 0;
4700
4701         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4702         if (rv) {
4703                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4704                 rv = -1;
4705                 goto fail;
4706         }
4707
4708         get_random_bytes(my_challenge, CHALLENGE_LEN);
4709
4710         sock = &tconn->data;
4711         if (!conn_prepare_command(tconn, sock)) {
4712                 rv = 0;
4713                 goto fail;
4714         }
4715         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4716                                 my_challenge, CHALLENGE_LEN);
4717         if (!rv)
4718                 goto fail;
4719
4720         err = drbd_recv_header(tconn, &pi);
4721         if (err) {
4722                 rv = 0;
4723                 goto fail;
4724         }
4725
4726         if (pi.cmd != P_AUTH_CHALLENGE) {
4727                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4728                          cmdname(pi.cmd), pi.cmd);
4729                 rv = 0;
4730                 goto fail;
4731         }
4732
4733         if (pi.size > CHALLENGE_LEN * 2) {
4734                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4735                 rv = -1;
4736                 goto fail;
4737         }
4738
4739         peers_ch = kmalloc(pi.size, GFP_NOIO);
4740         if (peers_ch == NULL) {
4741                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4742                 rv = -1;
4743                 goto fail;
4744         }
4745
4746         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4747         if (err) {
4748                 rv = 0;
4749                 goto fail;
4750         }
4751
4752         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4753         response = kmalloc(resp_size, GFP_NOIO);
4754         if (response == NULL) {
4755                 conn_err(tconn, "kmalloc of response failed\n");
4756                 rv = -1;
4757                 goto fail;
4758         }
4759
4760         sg_init_table(&sg, 1);
4761         sg_set_buf(&sg, peers_ch, pi.size);
4762
4763         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4764         if (rv) {
4765                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4766                 rv = -1;
4767                 goto fail;
4768         }
4769
4770         if (!conn_prepare_command(tconn, sock)) {
4771                 rv = 0;
4772                 goto fail;
4773         }
4774         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4775                                 response, resp_size);
4776         if (!rv)
4777                 goto fail;
4778
4779         err = drbd_recv_header(tconn, &pi);
4780         if (err) {
4781                 rv = 0;
4782                 goto fail;
4783         }
4784
4785         if (pi.cmd != P_AUTH_RESPONSE) {
4786                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4787                          cmdname(pi.cmd), pi.cmd);
4788                 rv = 0;
4789                 goto fail;
4790         }
4791
4792         if (pi.size != resp_size) {
4793                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4794                 rv = 0;
4795                 goto fail;
4796         }
4797
4798         err = drbd_recv_all_warn(tconn, response , resp_size);
4799         if (err) {
4800                 rv = 0;
4801                 goto fail;
4802         }
4803
4804         right_response = kmalloc(resp_size, GFP_NOIO);
4805         if (right_response == NULL) {
4806                 conn_err(tconn, "kmalloc of right_response failed\n");
4807                 rv = -1;
4808                 goto fail;
4809         }
4810
4811         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4812
4813         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4814         if (rv) {
4815                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4816                 rv = -1;
4817                 goto fail;
4818         }
4819
4820         rv = !memcmp(response, right_response, resp_size);
4821
4822         if (rv)
4823                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4824                      resp_size);
4825         else
4826                 rv = -1;
4827
4828  fail:
4829         kfree(peers_ch);
4830         kfree(response);
4831         kfree(right_response);
4832
4833         return rv;
4834 }
4835 #endif
4836
4837 int drbdd_init(struct drbd_thread *thi)
4838 {
4839         struct drbd_tconn *tconn = thi->tconn;
4840         int h;
4841
4842         conn_info(tconn, "receiver (re)started\n");
4843
4844         do {
4845                 h = conn_connect(tconn);
4846                 if (h == 0) {
4847                         conn_disconnect(tconn);
4848                         schedule_timeout_interruptible(HZ);
4849                 }
4850                 if (h == -1) {
4851                         conn_warn(tconn, "Discarding network configuration.\n");
4852                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4853                 }
4854         } while (h == 0);
4855
4856         if (h > 0)
4857                 drbdd(tconn);
4858
4859         conn_disconnect(tconn);
4860
4861         conn_info(tconn, "receiver terminated\n");
4862         return 0;
4863 }
4864
4865 /* ********* acknowledge sender ******** */
4866
4867 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4868 {
4869         struct p_req_state_reply *p = pi->data;
4870         int retcode = be32_to_cpu(p->retcode);
4871
4872         if (retcode >= SS_SUCCESS) {
4873                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4874         } else {
4875                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4876                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4877                          drbd_set_st_err_str(retcode), retcode);
4878         }
4879         wake_up(&tconn->ping_wait);
4880
4881         return 0;
4882 }
4883
4884 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4885 {
4886         struct drbd_conf *mdev;
4887         struct p_req_state_reply *p = pi->data;
4888         int retcode = be32_to_cpu(p->retcode);
4889
4890         mdev = vnr_to_mdev(tconn, pi->vnr);
4891         if (!mdev)
4892                 return -EIO;
4893
4894         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4895                 D_ASSERT(tconn->agreed_pro_version < 100);
4896                 return got_conn_RqSReply(tconn, pi);
4897         }
4898
4899         if (retcode >= SS_SUCCESS) {
4900                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4901         } else {
4902                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4903                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4904                         drbd_set_st_err_str(retcode), retcode);
4905         }
4906         wake_up(&mdev->state_wait);
4907
4908         return 0;
4909 }
4910
4911 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4912 {
4913         return drbd_send_ping_ack(tconn);
4914
4915 }
4916
4917 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4918 {
4919         /* restore idle timeout */
4920         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4921         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4922                 wake_up(&tconn->ping_wait);
4923
4924         return 0;
4925 }
4926
4927 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4928 {
4929         struct drbd_conf *mdev;
4930         struct p_block_ack *p = pi->data;
4931         sector_t sector = be64_to_cpu(p->sector);
4932         int blksize = be32_to_cpu(p->blksize);
4933
4934         mdev = vnr_to_mdev(tconn, pi->vnr);
4935         if (!mdev)
4936                 return -EIO;
4937
4938         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4939
4940         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4941
4942         if (get_ldev(mdev)) {
4943                 drbd_rs_complete_io(mdev, sector);
4944                 drbd_set_in_sync(mdev, sector, blksize);
4945                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4946                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4947                 put_ldev(mdev);
4948         }
4949         dec_rs_pending(mdev);
4950         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4951
4952         return 0;
4953 }
4954
4955 static int
4956 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4957                               struct rb_root *root, const char *func,
4958                               enum drbd_req_event what, bool missing_ok)
4959 {
4960         struct drbd_request *req;
4961         struct bio_and_error m;
4962
4963         spin_lock_irq(&mdev->tconn->req_lock);
4964         req = find_request(mdev, root, id, sector, missing_ok, func);
4965         if (unlikely(!req)) {
4966                 spin_unlock_irq(&mdev->tconn->req_lock);
4967                 return -EIO;
4968         }
4969         __req_mod(req, what, &m);
4970         spin_unlock_irq(&mdev->tconn->req_lock);
4971
4972         if (m.bio)
4973                 complete_master_bio(mdev, &m);
4974         return 0;
4975 }
4976
4977 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4978 {
4979         struct drbd_conf *mdev;
4980         struct p_block_ack *p = pi->data;
4981         sector_t sector = be64_to_cpu(p->sector);
4982         int blksize = be32_to_cpu(p->blksize);
4983         enum drbd_req_event what;
4984
4985         mdev = vnr_to_mdev(tconn, pi->vnr);
4986         if (!mdev)
4987                 return -EIO;
4988
4989         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4990
4991         if (p->block_id == ID_SYNCER) {
4992                 drbd_set_in_sync(mdev, sector, blksize);
4993                 dec_rs_pending(mdev);
4994                 return 0;
4995         }
4996         switch (pi->cmd) {
4997         case P_RS_WRITE_ACK:
4998                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4999                 break;
5000         case P_WRITE_ACK:
5001                 what = WRITE_ACKED_BY_PEER;
5002                 break;
5003         case P_RECV_ACK:
5004                 what = RECV_ACKED_BY_PEER;
5005                 break;
5006         case P_SUPERSEDED:
5007                 what = CONFLICT_RESOLVED;
5008                 break;
5009         case P_RETRY_WRITE:
5010                 what = POSTPONE_WRITE;
5011                 break;
5012         default:
5013                 BUG();
5014         }
5015
5016         return validate_req_change_req_state(mdev, p->block_id, sector,
5017                                              &mdev->write_requests, __func__,
5018                                              what, false);
5019 }
5020
5021 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
5022 {
5023         struct drbd_conf *mdev;
5024         struct p_block_ack *p = pi->data;
5025         sector_t sector = be64_to_cpu(p->sector);
5026         int size = be32_to_cpu(p->blksize);
5027         int err;
5028
5029         mdev = vnr_to_mdev(tconn, pi->vnr);
5030         if (!mdev)
5031                 return -EIO;
5032
5033         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5034
5035         if (p->block_id == ID_SYNCER) {
5036                 dec_rs_pending(mdev);
5037                 drbd_rs_failed_io(mdev, sector, size);
5038                 return 0;
5039         }
5040
5041         err = validate_req_change_req_state(mdev, p->block_id, sector,
5042                                             &mdev->write_requests, __func__,
5043                                             NEG_ACKED, true);
5044         if (err) {
5045                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5046                    The master bio might already be completed, therefore the
5047                    request is no longer in the collision hash. */
5048                 /* In Protocol B we might already have got a P_RECV_ACK
5049                    but then get a P_NEG_ACK afterwards. */
5050                 drbd_set_out_of_sync(mdev, sector, size);
5051         }
5052         return 0;
5053 }
5054
5055 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5056 {
5057         struct drbd_conf *mdev;
5058         struct p_block_ack *p = pi->data;
5059         sector_t sector = be64_to_cpu(p->sector);
5060
5061         mdev = vnr_to_mdev(tconn, pi->vnr);
5062         if (!mdev)
5063                 return -EIO;
5064
5065         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5066
5067         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5068             (unsigned long long)sector, be32_to_cpu(p->blksize));
5069
5070         return validate_req_change_req_state(mdev, p->block_id, sector,
5071                                              &mdev->read_requests, __func__,
5072                                              NEG_ACKED, false);
5073 }
5074
5075 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5076 {
5077         struct drbd_conf *mdev;
5078         sector_t sector;
5079         int size;
5080         struct p_block_ack *p = pi->data;
5081
5082         mdev = vnr_to_mdev(tconn, pi->vnr);
5083         if (!mdev)
5084                 return -EIO;
5085
5086         sector = be64_to_cpu(p->sector);
5087         size = be32_to_cpu(p->blksize);
5088
5089         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5090
5091         dec_rs_pending(mdev);
5092
5093         if (get_ldev_if_state(mdev, D_FAILED)) {
5094                 drbd_rs_complete_io(mdev, sector);
5095                 switch (pi->cmd) {
5096                 case P_NEG_RS_DREPLY:
5097                         drbd_rs_failed_io(mdev, sector, size);
5098                 case P_RS_CANCEL:
5099                         break;
5100                 default:
5101                         BUG();
5102                 }
5103                 put_ldev(mdev);
5104         }
5105
5106         return 0;
5107 }
5108
5109 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5110 {
5111         struct p_barrier_ack *p = pi->data;
5112         struct drbd_conf *mdev;
5113         int vnr;
5114
5115         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5116
5117         rcu_read_lock();
5118         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5119                 if (mdev->state.conn == C_AHEAD &&
5120                     atomic_read(&mdev->ap_in_flight) == 0 &&
5121                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5122                         mdev->start_resync_timer.expires = jiffies + HZ;
5123                         add_timer(&mdev->start_resync_timer);
5124                 }
5125         }
5126         rcu_read_unlock();
5127
5128         return 0;
5129 }
5130
5131 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5132 {
5133         struct drbd_conf *mdev;
5134         struct p_block_ack *p = pi->data;
5135         struct drbd_work *w;
5136         sector_t sector;
5137         int size;
5138
5139         mdev = vnr_to_mdev(tconn, pi->vnr);
5140         if (!mdev)
5141                 return -EIO;
5142
5143         sector = be64_to_cpu(p->sector);
5144         size = be32_to_cpu(p->blksize);
5145
5146         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5147
5148         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5149                 drbd_ov_out_of_sync_found(mdev, sector, size);
5150         else
5151                 ov_out_of_sync_print(mdev);
5152
5153         if (!get_ldev(mdev))
5154                 return 0;
5155
5156         drbd_rs_complete_io(mdev, sector);
5157         dec_rs_pending(mdev);
5158
5159         --mdev->ov_left;
5160
5161         /* let's advance progress step marks only for every other megabyte */
5162         if ((mdev->ov_left & 0x200) == 0x200)
5163                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5164
5165         if (mdev->ov_left == 0) {
5166                 w = kmalloc(sizeof(*w), GFP_NOIO);
5167                 if (w) {
5168                         w->cb = w_ov_finished;
5169                         w->mdev = mdev;
5170                         drbd_queue_work(&mdev->tconn->sender_work, w);
5171                 } else {
5172                         dev_err(DEV, "kmalloc(w) failed.");
5173                         ov_out_of_sync_print(mdev);
5174                         drbd_resync_finished(mdev);
5175                 }
5176         }
5177         put_ldev(mdev);
5178         return 0;
5179 }
5180
5181 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5182 {
5183         return 0;
5184 }
5185
5186 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5187 {
5188         struct drbd_conf *mdev;
5189         int vnr, not_empty = 0;
5190
5191         do {
5192                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5193                 flush_signals(current);
5194
5195                 rcu_read_lock();
5196                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5197                         kref_get(&mdev->kref);
5198                         rcu_read_unlock();
5199                         if (drbd_finish_peer_reqs(mdev)) {
5200                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5201                                 return 1;
5202                         }
5203                         kref_put(&mdev->kref, &drbd_minor_destroy);
5204                         rcu_read_lock();
5205                 }
5206                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5207
5208                 spin_lock_irq(&tconn->req_lock);
5209                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5210                         not_empty = !list_empty(&mdev->done_ee);
5211                         if (not_empty)
5212                                 break;
5213                 }
5214                 spin_unlock_irq(&tconn->req_lock);
5215                 rcu_read_unlock();
5216         } while (not_empty);
5217
5218         return 0;
5219 }
5220
5221 struct asender_cmd {
5222         size_t pkt_size;
5223         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5224 };
5225
5226 static struct asender_cmd asender_tbl[] = {
5227         [P_PING]            = { 0, got_Ping },
5228         [P_PING_ACK]        = { 0, got_PingAck },
5229         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5230         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5231         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5232         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5233         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5234         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5235         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5236         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5237         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5238         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5239         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5240         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5241         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5242         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5243         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5244 };
5245
5246 int drbd_asender(struct drbd_thread *thi)
5247 {
5248         struct drbd_tconn *tconn = thi->tconn;
5249         struct asender_cmd *cmd = NULL;
5250         struct packet_info pi;
5251         int rv;
5252         void *buf    = tconn->meta.rbuf;
5253         int received = 0;
5254         unsigned int header_size = drbd_header_size(tconn);
5255         int expect   = header_size;
5256         bool ping_timeout_active = false;
5257         struct net_conf *nc;
5258         int ping_timeo, tcp_cork, ping_int;
5259
5260         current->policy = SCHED_RR;  /* Make this a realtime task! */
5261         current->rt_priority = 2;    /* more important than all other tasks */
5262
5263         while (get_t_state(thi) == RUNNING) {
5264                 drbd_thread_current_set_cpu(thi);
5265
5266                 rcu_read_lock();
5267                 nc = rcu_dereference(tconn->net_conf);
5268                 ping_timeo = nc->ping_timeo;
5269                 tcp_cork = nc->tcp_cork;
5270                 ping_int = nc->ping_int;
5271                 rcu_read_unlock();
5272
5273                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5274                         if (drbd_send_ping(tconn)) {
5275                                 conn_err(tconn, "drbd_send_ping has failed\n");
5276                                 goto reconnect;
5277                         }
5278                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5279                         ping_timeout_active = true;
5280                 }
5281
5282                 /* TODO: conditionally cork; it may hurt latency if we cork without
5283                    much to send */
5284                 if (tcp_cork)
5285                         drbd_tcp_cork(tconn->meta.socket);
5286                 if (tconn_finish_peer_reqs(tconn)) {
5287                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5288                         goto reconnect;
5289                 }
5290                 /* but unconditionally uncork unless disabled */
5291                 if (tcp_cork)
5292                         drbd_tcp_uncork(tconn->meta.socket);
5293
5294                 /* short circuit, recv_msg would return EINTR anyways. */
5295                 if (signal_pending(current))
5296                         continue;
5297
5298                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5299                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5300
5301                 flush_signals(current);
5302
5303                 /* Note:
5304                  * -EINTR        (on meta) we got a signal
5305                  * -EAGAIN       (on meta) rcvtimeo expired
5306                  * -ECONNRESET   other side closed the connection
5307                  * -ERESTARTSYS  (on data) we got a signal
5308                  * rv <  0       other than above: unexpected error!
5309                  * rv == expected: full header or command
5310                  * rv <  expected: "woken" by signal during receive
5311                  * rv == 0       : "connection shut down by peer"
5312                  */
5313                 if (likely(rv > 0)) {
5314                         received += rv;
5315                         buf      += rv;
5316                 } else if (rv == 0) {
5317                         if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
5318                                 long t;
5319                                 rcu_read_lock();
5320                                 t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
5321                                 rcu_read_unlock();
5322
5323                                 t = wait_event_timeout(tconn->ping_wait,
5324                                                        tconn->cstate < C_WF_REPORT_PARAMS,
5325                                                        t);
5326                                 if (t)
5327                                         break;
5328                         }
5329                         conn_err(tconn, "meta connection shut down by peer.\n");
5330                         goto reconnect;
5331                 } else if (rv == -EAGAIN) {
5332                         /* If the data socket received something meanwhile,
5333                          * that is good enough: peer is still alive. */
5334                         if (time_after(tconn->last_received,
5335                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5336                                 continue;
5337                         if (ping_timeout_active) {
5338                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5339                                 goto reconnect;
5340                         }
5341                         set_bit(SEND_PING, &tconn->flags);
5342                         continue;
5343                 } else if (rv == -EINTR) {
5344                         continue;
5345                 } else {
5346                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5347                         goto reconnect;
5348                 }
5349
5350                 if (received == expect && cmd == NULL) {
5351                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5352                                 goto reconnect;
5353                         cmd = &asender_tbl[pi.cmd];
5354                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5355                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5356                                          cmdname(pi.cmd), pi.cmd);
5357                                 goto disconnect;
5358                         }
5359                         expect = header_size + cmd->pkt_size;
5360                         if (pi.size != expect - header_size) {
5361                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5362                                         pi.cmd, pi.size);
5363                                 goto reconnect;
5364                         }
5365                 }
5366                 if (received == expect) {
5367                         bool err;
5368
5369                         err = cmd->fn(tconn, &pi);
5370                         if (err) {
5371                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5372                                 goto reconnect;
5373                         }
5374
5375                         tconn->last_received = jiffies;
5376
5377                         if (cmd == &asender_tbl[P_PING_ACK]) {
5378                                 /* restore idle timeout */
5379                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5380                                 ping_timeout_active = false;
5381                         }
5382
5383                         buf      = tconn->meta.rbuf;
5384                         received = 0;
5385                         expect   = header_size;
5386                         cmd      = NULL;
5387                 }
5388         }
5389
5390         if (0) {
5391 reconnect:
5392                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5393                 conn_md_sync(tconn);
5394         }
5395         if (0) {
5396 disconnect:
5397                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5398         }
5399         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5400
5401         conn_info(tconn, "asender terminated\n");
5402
5403         return 0;
5404 }