Merge branch 'fixes' of git://git.linaro.org/people/rmk/linux-arm
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_peer_request *peer_req;
337         struct page *page = NULL;
338         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
339
340         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
341                 return NULL;
342
343         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
344         if (!peer_req) {
345                 if (!(gfp_mask & __GFP_NOWARN))
346                         dev_err(DEV, "%s: allocation failed\n", __func__);
347                 return NULL;
348         }
349
350         if (data_size) {
351                 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
352                 if (!page)
353                         goto fail;
354         }
355
356         drbd_clear_interval(&peer_req->i);
357         peer_req->i.size = data_size;
358         peer_req->i.sector = sector;
359         peer_req->i.local = false;
360         peer_req->i.waiting = false;
361
362         peer_req->epoch = NULL;
363         peer_req->w.mdev = mdev;
364         peer_req->pages = page;
365         atomic_set(&peer_req->pending_bios, 0);
366         peer_req->flags = 0;
367         /*
368          * The block_id is opaque to the receiver.  It is not endianness
369          * converted, and sent back to the sender unchanged.
370          */
371         peer_req->block_id = id;
372
373         return peer_req;
374
375  fail:
376         mempool_free(peer_req, drbd_ee_mempool);
377         return NULL;
378 }
379
380 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
381                        int is_net)
382 {
383         if (peer_req->flags & EE_HAS_DIGEST)
384                 kfree(peer_req->digest);
385         drbd_free_pages(mdev, peer_req->pages, is_net);
386         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
387         D_ASSERT(drbd_interval_empty(&peer_req->i));
388         mempool_free(peer_req, drbd_ee_mempool);
389 }
390
391 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
392 {
393         LIST_HEAD(work_list);
394         struct drbd_peer_request *peer_req, *t;
395         int count = 0;
396         int is_net = list == &mdev->net_ee;
397
398         spin_lock_irq(&mdev->tconn->req_lock);
399         list_splice_init(list, &work_list);
400         spin_unlock_irq(&mdev->tconn->req_lock);
401
402         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
403                 __drbd_free_peer_req(mdev, peer_req, is_net);
404                 count++;
405         }
406         return count;
407 }
408
409 /*
410  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
411  */
412 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
413 {
414         LIST_HEAD(work_list);
415         LIST_HEAD(reclaimed);
416         struct drbd_peer_request *peer_req, *t;
417         int err = 0;
418
419         spin_lock_irq(&mdev->tconn->req_lock);
420         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
421         list_splice_init(&mdev->done_ee, &work_list);
422         spin_unlock_irq(&mdev->tconn->req_lock);
423
424         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
425                 drbd_free_net_peer_req(mdev, peer_req);
426
427         /* possible callbacks here:
428          * e_end_block, and e_end_resync_block, e_send_superseded.
429          * all ignore the last argument.
430          */
431         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432                 int err2;
433
434                 /* list_del not necessary, next/prev members not touched */
435                 err2 = peer_req->w.cb(&peer_req->w, !!err);
436                 if (!err)
437                         err = err2;
438                 drbd_free_peer_req(mdev, peer_req);
439         }
440         wake_up(&mdev->ee_wait);
441
442         return err;
443 }
444
445 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
446                                      struct list_head *head)
447 {
448         DEFINE_WAIT(wait);
449
450         /* avoids spin_lock/unlock
451          * and calling prepare_to_wait in the fast path */
452         while (!list_empty(head)) {
453                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
454                 spin_unlock_irq(&mdev->tconn->req_lock);
455                 io_schedule();
456                 finish_wait(&mdev->ee_wait, &wait);
457                 spin_lock_irq(&mdev->tconn->req_lock);
458         }
459 }
460
461 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
462                                     struct list_head *head)
463 {
464         spin_lock_irq(&mdev->tconn->req_lock);
465         _drbd_wait_ee_list_empty(mdev, head);
466         spin_unlock_irq(&mdev->tconn->req_lock);
467 }
468
469 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
470 {
471         mm_segment_t oldfs;
472         struct kvec iov = {
473                 .iov_base = buf,
474                 .iov_len = size,
475         };
476         struct msghdr msg = {
477                 .msg_iovlen = 1,
478                 .msg_iov = (struct iovec *)&iov,
479                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
480         };
481         int rv;
482
483         oldfs = get_fs();
484         set_fs(KERNEL_DS);
485         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
486         set_fs(oldfs);
487
488         return rv;
489 }
490
491 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
492 {
493         int rv;
494
495         rv = drbd_recv_short(tconn->data.socket, buf, size, 0);
496
497         if (rv < 0) {
498                 if (rv == -ECONNRESET)
499                         conn_info(tconn, "sock was reset by peer\n");
500                 else if (rv != -ERESTARTSYS)
501                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
502         } else if (rv == 0) {
503                 if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
504                         long t;
505                         rcu_read_lock();
506                         t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
507                         rcu_read_unlock();
508
509                         t = wait_event_timeout(tconn->ping_wait, tconn->cstate < C_WF_REPORT_PARAMS, t);
510
511                         if (t)
512                                 goto out;
513                 }
514                 conn_info(tconn, "sock was shut down by peer\n");
515         }
516
517         if (rv != size)
518                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
519
520 out:
521         return rv;
522 }
523
524 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
525 {
526         int err;
527
528         err = drbd_recv(tconn, buf, size);
529         if (err != size) {
530                 if (err >= 0)
531                         err = -EIO;
532         } else
533                 err = 0;
534         return err;
535 }
536
537 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
538 {
539         int err;
540
541         err = drbd_recv_all(tconn, buf, size);
542         if (err && !signal_pending(current))
543                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
544         return err;
545 }
546
547 /* quoting tcp(7):
548  *   On individual connections, the socket buffer size must be set prior to the
549  *   listen(2) or connect(2) calls in order to have it take effect.
550  * This is our wrapper to do so.
551  */
552 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
553                 unsigned int rcv)
554 {
555         /* open coded SO_SNDBUF, SO_RCVBUF */
556         if (snd) {
557                 sock->sk->sk_sndbuf = snd;
558                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
559         }
560         if (rcv) {
561                 sock->sk->sk_rcvbuf = rcv;
562                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
563         }
564 }
565
566 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
567 {
568         const char *what;
569         struct socket *sock;
570         struct sockaddr_in6 src_in6;
571         struct sockaddr_in6 peer_in6;
572         struct net_conf *nc;
573         int err, peer_addr_len, my_addr_len;
574         int sndbuf_size, rcvbuf_size, connect_int;
575         int disconnect_on_error = 1;
576
577         rcu_read_lock();
578         nc = rcu_dereference(tconn->net_conf);
579         if (!nc) {
580                 rcu_read_unlock();
581                 return NULL;
582         }
583         sndbuf_size = nc->sndbuf_size;
584         rcvbuf_size = nc->rcvbuf_size;
585         connect_int = nc->connect_int;
586         rcu_read_unlock();
587
588         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
589         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
590
591         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
592                 src_in6.sin6_port = 0;
593         else
594                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
595
596         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
597         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
598
599         what = "sock_create_kern";
600         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
601                                SOCK_STREAM, IPPROTO_TCP, &sock);
602         if (err < 0) {
603                 sock = NULL;
604                 goto out;
605         }
606
607         sock->sk->sk_rcvtimeo =
608         sock->sk->sk_sndtimeo = connect_int * HZ;
609         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
610
611        /* explicitly bind to the configured IP as source IP
612         *  for the outgoing connections.
613         *  This is needed for multihomed hosts and to be
614         *  able to use lo: interfaces for drbd.
615         * Make sure to use 0 as port number, so linux selects
616         *  a free one dynamically.
617         */
618         what = "bind before connect";
619         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
620         if (err < 0)
621                 goto out;
622
623         /* connect may fail, peer not yet available.
624          * stay C_WF_CONNECTION, don't go Disconnecting! */
625         disconnect_on_error = 0;
626         what = "connect";
627         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
628
629 out:
630         if (err < 0) {
631                 if (sock) {
632                         sock_release(sock);
633                         sock = NULL;
634                 }
635                 switch (-err) {
636                         /* timeout, busy, signal pending */
637                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
638                 case EINTR: case ERESTARTSYS:
639                         /* peer not (yet) available, network problem */
640                 case ECONNREFUSED: case ENETUNREACH:
641                 case EHOSTDOWN:    case EHOSTUNREACH:
642                         disconnect_on_error = 0;
643                         break;
644                 default:
645                         conn_err(tconn, "%s failed, err = %d\n", what, err);
646                 }
647                 if (disconnect_on_error)
648                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
649         }
650
651         return sock;
652 }
653
654 struct accept_wait_data {
655         struct drbd_tconn *tconn;
656         struct socket *s_listen;
657         struct completion door_bell;
658         void (*original_sk_state_change)(struct sock *sk);
659
660 };
661
662 static void drbd_incoming_connection(struct sock *sk)
663 {
664         struct accept_wait_data *ad = sk->sk_user_data;
665         void (*state_change)(struct sock *sk);
666
667         state_change = ad->original_sk_state_change;
668         if (sk->sk_state == TCP_ESTABLISHED)
669                 complete(&ad->door_bell);
670         state_change(sk);
671 }
672
673 static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
674 {
675         int err, sndbuf_size, rcvbuf_size, my_addr_len;
676         struct sockaddr_in6 my_addr;
677         struct socket *s_listen;
678         struct net_conf *nc;
679         const char *what;
680
681         rcu_read_lock();
682         nc = rcu_dereference(tconn->net_conf);
683         if (!nc) {
684                 rcu_read_unlock();
685                 return -EIO;
686         }
687         sndbuf_size = nc->sndbuf_size;
688         rcvbuf_size = nc->rcvbuf_size;
689         rcu_read_unlock();
690
691         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
692         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
693
694         what = "sock_create_kern";
695         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
696                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
697         if (err) {
698                 s_listen = NULL;
699                 goto out;
700         }
701
702         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
703         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
704
705         what = "bind before listen";
706         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
707         if (err < 0)
708                 goto out;
709
710         ad->s_listen = s_listen;
711         write_lock_bh(&s_listen->sk->sk_callback_lock);
712         ad->original_sk_state_change = s_listen->sk->sk_state_change;
713         s_listen->sk->sk_state_change = drbd_incoming_connection;
714         s_listen->sk->sk_user_data = ad;
715         write_unlock_bh(&s_listen->sk->sk_callback_lock);
716
717         what = "listen";
718         err = s_listen->ops->listen(s_listen, 5);
719         if (err < 0)
720                 goto out;
721
722         return 0;
723 out:
724         if (s_listen)
725                 sock_release(s_listen);
726         if (err < 0) {
727                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
728                         conn_err(tconn, "%s failed, err = %d\n", what, err);
729                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
730                 }
731         }
732
733         return -EIO;
734 }
735
736 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
737 {
738         write_lock_bh(&sk->sk_callback_lock);
739         sk->sk_state_change = ad->original_sk_state_change;
740         sk->sk_user_data = NULL;
741         write_unlock_bh(&sk->sk_callback_lock);
742 }
743
744 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
745 {
746         int timeo, connect_int, err = 0;
747         struct socket *s_estab = NULL;
748         struct net_conf *nc;
749
750         rcu_read_lock();
751         nc = rcu_dereference(tconn->net_conf);
752         if (!nc) {
753                 rcu_read_unlock();
754                 return NULL;
755         }
756         connect_int = nc->connect_int;
757         rcu_read_unlock();
758
759         timeo = connect_int * HZ;
760         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
761
762         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
763         if (err <= 0)
764                 return NULL;
765
766         err = kernel_accept(ad->s_listen, &s_estab, 0);
767         if (err < 0) {
768                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769                         conn_err(tconn, "accept failed, err = %d\n", err);
770                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
771                 }
772         }
773
774         if (s_estab)
775                 unregister_state_change(s_estab->sk, ad);
776
777         return s_estab;
778 }
779
780 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
781
782 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
783                              enum drbd_packet cmd)
784 {
785         if (!conn_prepare_command(tconn, sock))
786                 return -EIO;
787         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
788 }
789
790 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
791 {
792         unsigned int header_size = drbd_header_size(tconn);
793         struct packet_info pi;
794         int err;
795
796         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
797         if (err != header_size) {
798                 if (err >= 0)
799                         err = -EIO;
800                 return err;
801         }
802         err = decode_header(tconn, tconn->data.rbuf, &pi);
803         if (err)
804                 return err;
805         return pi.cmd;
806 }
807
808 /**
809  * drbd_socket_okay() - Free the socket if its connection is not okay
810  * @sock:       pointer to the pointer to the socket.
811  */
812 static int drbd_socket_okay(struct socket **sock)
813 {
814         int rr;
815         char tb[4];
816
817         if (!*sock)
818                 return false;
819
820         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
821
822         if (rr > 0 || rr == -EAGAIN) {
823                 return true;
824         } else {
825                 sock_release(*sock);
826                 *sock = NULL;
827                 return false;
828         }
829 }
830 /* Gets called if a connection is established, or if a new minor gets created
831    in a connection */
832 int drbd_connected(struct drbd_conf *mdev)
833 {
834         int err;
835
836         atomic_set(&mdev->packet_seq, 0);
837         mdev->peer_seq = 0;
838
839         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
840                 &mdev->tconn->cstate_mutex :
841                 &mdev->own_state_mutex;
842
843         err = drbd_send_sync_param(mdev);
844         if (!err)
845                 err = drbd_send_sizes(mdev, 0, 0);
846         if (!err)
847                 err = drbd_send_uuids(mdev);
848         if (!err)
849                 err = drbd_send_current_state(mdev);
850         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
851         clear_bit(RESIZE_PENDING, &mdev->flags);
852         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
853         return err;
854 }
855
856 /*
857  * return values:
858  *   1 yes, we have a valid connection
859  *   0 oops, did not work out, please try again
860  *  -1 peer talks different language,
861  *     no point in trying again, please go standalone.
862  *  -2 We do not have a network config...
863  */
864 static int conn_connect(struct drbd_tconn *tconn)
865 {
866         struct drbd_socket sock, msock;
867         struct drbd_conf *mdev;
868         struct net_conf *nc;
869         int vnr, timeout, h, ok;
870         bool discard_my_data;
871         enum drbd_state_rv rv;
872         struct accept_wait_data ad = {
873                 .tconn = tconn,
874                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
875         };
876
877         clear_bit(DISCONNECT_SENT, &tconn->flags);
878         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
879                 return -2;
880
881         mutex_init(&sock.mutex);
882         sock.sbuf = tconn->data.sbuf;
883         sock.rbuf = tconn->data.rbuf;
884         sock.socket = NULL;
885         mutex_init(&msock.mutex);
886         msock.sbuf = tconn->meta.sbuf;
887         msock.rbuf = tconn->meta.rbuf;
888         msock.socket = NULL;
889
890         /* Assume that the peer only understands protocol 80 until we know better.  */
891         tconn->agreed_pro_version = 80;
892
893         if (prepare_listen_socket(tconn, &ad))
894                 return 0;
895
896         do {
897                 struct socket *s;
898
899                 s = drbd_try_connect(tconn);
900                 if (s) {
901                         if (!sock.socket) {
902                                 sock.socket = s;
903                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
904                         } else if (!msock.socket) {
905                                 clear_bit(RESOLVE_CONFLICTS, &tconn->flags);
906                                 msock.socket = s;
907                                 send_first_packet(tconn, &msock, P_INITIAL_META);
908                         } else {
909                                 conn_err(tconn, "Logic error in conn_connect()\n");
910                                 goto out_release_sockets;
911                         }
912                 }
913
914                 if (sock.socket && msock.socket) {
915                         rcu_read_lock();
916                         nc = rcu_dereference(tconn->net_conf);
917                         timeout = nc->ping_timeo * HZ / 10;
918                         rcu_read_unlock();
919                         schedule_timeout_interruptible(timeout);
920                         ok = drbd_socket_okay(&sock.socket);
921                         ok = drbd_socket_okay(&msock.socket) && ok;
922                         if (ok)
923                                 break;
924                 }
925
926 retry:
927                 s = drbd_wait_for_connect(tconn, &ad);
928                 if (s) {
929                         int fp = receive_first_packet(tconn, s);
930                         drbd_socket_okay(&sock.socket);
931                         drbd_socket_okay(&msock.socket);
932                         switch (fp) {
933                         case P_INITIAL_DATA:
934                                 if (sock.socket) {
935                                         conn_warn(tconn, "initial packet S crossed\n");
936                                         sock_release(sock.socket);
937                                         sock.socket = s;
938                                         goto randomize;
939                                 }
940                                 sock.socket = s;
941                                 break;
942                         case P_INITIAL_META:
943                                 set_bit(RESOLVE_CONFLICTS, &tconn->flags);
944                                 if (msock.socket) {
945                                         conn_warn(tconn, "initial packet M crossed\n");
946                                         sock_release(msock.socket);
947                                         msock.socket = s;
948                                         goto randomize;
949                                 }
950                                 msock.socket = s;
951                                 break;
952                         default:
953                                 conn_warn(tconn, "Error receiving initial packet\n");
954                                 sock_release(s);
955 randomize:
956                                 if (random32() & 1)
957                                         goto retry;
958                         }
959                 }
960
961                 if (tconn->cstate <= C_DISCONNECTING)
962                         goto out_release_sockets;
963                 if (signal_pending(current)) {
964                         flush_signals(current);
965                         smp_rmb();
966                         if (get_t_state(&tconn->receiver) == EXITING)
967                                 goto out_release_sockets;
968                 }
969
970                 ok = drbd_socket_okay(&sock.socket);
971                 ok = drbd_socket_okay(&msock.socket) && ok;
972         } while (!ok);
973
974         if (ad.s_listen)
975                 sock_release(ad.s_listen);
976
977         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
978         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
979
980         sock.socket->sk->sk_allocation = GFP_NOIO;
981         msock.socket->sk->sk_allocation = GFP_NOIO;
982
983         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
984         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
985
986         /* NOT YET ...
987          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
988          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
989          * first set it to the P_CONNECTION_FEATURES timeout,
990          * which we set to 4x the configured ping_timeout. */
991         rcu_read_lock();
992         nc = rcu_dereference(tconn->net_conf);
993
994         sock.socket->sk->sk_sndtimeo =
995         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
996
997         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
998         timeout = nc->timeout * HZ / 10;
999         discard_my_data = nc->discard_my_data;
1000         rcu_read_unlock();
1001
1002         msock.socket->sk->sk_sndtimeo = timeout;
1003
1004         /* we don't want delays.
1005          * we use TCP_CORK where appropriate, though */
1006         drbd_tcp_nodelay(sock.socket);
1007         drbd_tcp_nodelay(msock.socket);
1008
1009         tconn->data.socket = sock.socket;
1010         tconn->meta.socket = msock.socket;
1011         tconn->last_received = jiffies;
1012
1013         h = drbd_do_features(tconn);
1014         if (h <= 0)
1015                 return h;
1016
1017         if (tconn->cram_hmac_tfm) {
1018                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
1019                 switch (drbd_do_auth(tconn)) {
1020                 case -1:
1021                         conn_err(tconn, "Authentication of peer failed\n");
1022                         return -1;
1023                 case 0:
1024                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1025                         return 0;
1026                 }
1027         }
1028
1029         tconn->data.socket->sk->sk_sndtimeo = timeout;
1030         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1031
1032         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1033                 return -1;
1034
1035         set_bit(STATE_SENT, &tconn->flags);
1036
1037         rcu_read_lock();
1038         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1039                 kref_get(&mdev->kref);
1040                 /* Prevent a race between resync-handshake and
1041                  * being promoted to Primary.
1042                  *
1043                  * Grab and release the state mutex, so we know that any current
1044                  * drbd_set_role() is finished, and any incoming drbd_set_role
1045                  * will see the STATE_SENT flag, and wait for it to be cleared.
1046                  */
1047                 mutex_lock(mdev->state_mutex);
1048                 mutex_unlock(mdev->state_mutex);
1049
1050                 rcu_read_unlock();
1051
1052                 if (discard_my_data)
1053                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1054                 else
1055                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1056
1057                 drbd_connected(mdev);
1058                 kref_put(&mdev->kref, &drbd_minor_destroy);
1059                 rcu_read_lock();
1060         }
1061         rcu_read_unlock();
1062
1063         rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1064         if (rv < SS_SUCCESS || tconn->cstate != C_WF_REPORT_PARAMS) {
1065                 clear_bit(STATE_SENT, &tconn->flags);
1066                 return 0;
1067         }
1068
1069         drbd_thread_start(&tconn->asender);
1070
1071         mutex_lock(&tconn->conf_update);
1072         /* The discard_my_data flag is a single-shot modifier to the next
1073          * connection attempt, the handshake of which is now well underway.
1074          * No need for rcu style copying of the whole struct
1075          * just to clear a single value. */
1076         tconn->net_conf->discard_my_data = 0;
1077         mutex_unlock(&tconn->conf_update);
1078
1079         return h;
1080
1081 out_release_sockets:
1082         if (ad.s_listen)
1083                 sock_release(ad.s_listen);
1084         if (sock.socket)
1085                 sock_release(sock.socket);
1086         if (msock.socket)
1087                 sock_release(msock.socket);
1088         return -1;
1089 }
1090
1091 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1092 {
1093         unsigned int header_size = drbd_header_size(tconn);
1094
1095         if (header_size == sizeof(struct p_header100) &&
1096             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1097                 struct p_header100 *h = header;
1098                 if (h->pad != 0) {
1099                         conn_err(tconn, "Header padding is not zero\n");
1100                         return -EINVAL;
1101                 }
1102                 pi->vnr = be16_to_cpu(h->volume);
1103                 pi->cmd = be16_to_cpu(h->command);
1104                 pi->size = be32_to_cpu(h->length);
1105         } else if (header_size == sizeof(struct p_header95) &&
1106                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1107                 struct p_header95 *h = header;
1108                 pi->cmd = be16_to_cpu(h->command);
1109                 pi->size = be32_to_cpu(h->length);
1110                 pi->vnr = 0;
1111         } else if (header_size == sizeof(struct p_header80) &&
1112                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1113                 struct p_header80 *h = header;
1114                 pi->cmd = be16_to_cpu(h->command);
1115                 pi->size = be16_to_cpu(h->length);
1116                 pi->vnr = 0;
1117         } else {
1118                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1119                          be32_to_cpu(*(__be32 *)header),
1120                          tconn->agreed_pro_version);
1121                 return -EINVAL;
1122         }
1123         pi->data = header + header_size;
1124         return 0;
1125 }
1126
1127 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1128 {
1129         void *buffer = tconn->data.rbuf;
1130         int err;
1131
1132         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1133         if (err)
1134                 return err;
1135
1136         err = decode_header(tconn, buffer, pi);
1137         tconn->last_received = jiffies;
1138
1139         return err;
1140 }
1141
1142 static void drbd_flush(struct drbd_tconn *tconn)
1143 {
1144         int rv;
1145         struct drbd_conf *mdev;
1146         int vnr;
1147
1148         if (tconn->write_ordering >= WO_bdev_flush) {
1149                 rcu_read_lock();
1150                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1151                         if (!get_ldev(mdev))
1152                                 continue;
1153                         kref_get(&mdev->kref);
1154                         rcu_read_unlock();
1155
1156                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1157                                         GFP_NOIO, NULL);
1158                         if (rv) {
1159                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1160                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1161                                  * don't try again for ANY return value != 0
1162                                  * if (rv == -EOPNOTSUPP) */
1163                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1164                         }
1165                         put_ldev(mdev);
1166                         kref_put(&mdev->kref, &drbd_minor_destroy);
1167
1168                         rcu_read_lock();
1169                         if (rv)
1170                                 break;
1171                 }
1172                 rcu_read_unlock();
1173         }
1174 }
1175
1176 /**
1177  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1178  * @mdev:       DRBD device.
1179  * @epoch:      Epoch object.
1180  * @ev:         Epoch event.
1181  */
1182 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1183                                                struct drbd_epoch *epoch,
1184                                                enum epoch_event ev)
1185 {
1186         int epoch_size;
1187         struct drbd_epoch *next_epoch;
1188         enum finish_epoch rv = FE_STILL_LIVE;
1189
1190         spin_lock(&tconn->epoch_lock);
1191         do {
1192                 next_epoch = NULL;
1193
1194                 epoch_size = atomic_read(&epoch->epoch_size);
1195
1196                 switch (ev & ~EV_CLEANUP) {
1197                 case EV_PUT:
1198                         atomic_dec(&epoch->active);
1199                         break;
1200                 case EV_GOT_BARRIER_NR:
1201                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1202                         break;
1203                 case EV_BECAME_LAST:
1204                         /* nothing to do*/
1205                         break;
1206                 }
1207
1208                 if (epoch_size != 0 &&
1209                     atomic_read(&epoch->active) == 0 &&
1210                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1211                         if (!(ev & EV_CLEANUP)) {
1212                                 spin_unlock(&tconn->epoch_lock);
1213                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1214                                 spin_lock(&tconn->epoch_lock);
1215                         }
1216 #if 0
1217                         /* FIXME: dec unacked on connection, once we have
1218                          * something to count pending connection packets in. */
1219                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1220                                 dec_unacked(epoch->tconn);
1221 #endif
1222
1223                         if (tconn->current_epoch != epoch) {
1224                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1225                                 list_del(&epoch->list);
1226                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1227                                 tconn->epochs--;
1228                                 kfree(epoch);
1229
1230                                 if (rv == FE_STILL_LIVE)
1231                                         rv = FE_DESTROYED;
1232                         } else {
1233                                 epoch->flags = 0;
1234                                 atomic_set(&epoch->epoch_size, 0);
1235                                 /* atomic_set(&epoch->active, 0); is already zero */
1236                                 if (rv == FE_STILL_LIVE)
1237                                         rv = FE_RECYCLED;
1238                         }
1239                 }
1240
1241                 if (!next_epoch)
1242                         break;
1243
1244                 epoch = next_epoch;
1245         } while (1);
1246
1247         spin_unlock(&tconn->epoch_lock);
1248
1249         return rv;
1250 }
1251
1252 /**
1253  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1254  * @tconn:      DRBD connection.
1255  * @wo:         Write ordering method to try.
1256  */
1257 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1258 {
1259         struct disk_conf *dc;
1260         struct drbd_conf *mdev;
1261         enum write_ordering_e pwo;
1262         int vnr;
1263         static char *write_ordering_str[] = {
1264                 [WO_none] = "none",
1265                 [WO_drain_io] = "drain",
1266                 [WO_bdev_flush] = "flush",
1267         };
1268
1269         pwo = tconn->write_ordering;
1270         wo = min(pwo, wo);
1271         rcu_read_lock();
1272         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1273                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1274                         continue;
1275                 dc = rcu_dereference(mdev->ldev->disk_conf);
1276
1277                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1278                         wo = WO_drain_io;
1279                 if (wo == WO_drain_io && !dc->disk_drain)
1280                         wo = WO_none;
1281                 put_ldev(mdev);
1282         }
1283         rcu_read_unlock();
1284         tconn->write_ordering = wo;
1285         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1286                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1287 }
1288
1289 /**
1290  * drbd_submit_peer_request()
1291  * @mdev:       DRBD device.
1292  * @peer_req:   peer request
1293  * @rw:         flag field, see bio->bi_rw
1294  *
1295  * May spread the pages to multiple bios,
1296  * depending on bio_add_page restrictions.
1297  *
1298  * Returns 0 if all bios have been submitted,
1299  * -ENOMEM if we could not allocate enough bios,
1300  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1301  *  single page to an empty bio (which should never happen and likely indicates
1302  *  that the lower level IO stack is in some way broken). This has been observed
1303  *  on certain Xen deployments.
1304  */
1305 /* TODO allocate from our own bio_set. */
1306 int drbd_submit_peer_request(struct drbd_conf *mdev,
1307                              struct drbd_peer_request *peer_req,
1308                              const unsigned rw, const int fault_type)
1309 {
1310         struct bio *bios = NULL;
1311         struct bio *bio;
1312         struct page *page = peer_req->pages;
1313         sector_t sector = peer_req->i.sector;
1314         unsigned ds = peer_req->i.size;
1315         unsigned n_bios = 0;
1316         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1317         int err = -ENOMEM;
1318
1319         /* In most cases, we will only need one bio.  But in case the lower
1320          * level restrictions happen to be different at this offset on this
1321          * side than those of the sending peer, we may need to submit the
1322          * request in more than one bio.
1323          *
1324          * Plain bio_alloc is good enough here, this is no DRBD internally
1325          * generated bio, but a bio allocated on behalf of the peer.
1326          */
1327 next_bio:
1328         bio = bio_alloc(GFP_NOIO, nr_pages);
1329         if (!bio) {
1330                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1331                 goto fail;
1332         }
1333         /* > peer_req->i.sector, unless this is the first bio */
1334         bio->bi_sector = sector;
1335         bio->bi_bdev = mdev->ldev->backing_bdev;
1336         bio->bi_rw = rw;
1337         bio->bi_private = peer_req;
1338         bio->bi_end_io = drbd_peer_request_endio;
1339
1340         bio->bi_next = bios;
1341         bios = bio;
1342         ++n_bios;
1343
1344         page_chain_for_each(page) {
1345                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1346                 if (!bio_add_page(bio, page, len, 0)) {
1347                         /* A single page must always be possible!
1348                          * But in case it fails anyways,
1349                          * we deal with it, and complain (below). */
1350                         if (bio->bi_vcnt == 0) {
1351                                 dev_err(DEV,
1352                                         "bio_add_page failed for len=%u, "
1353                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1354                                         len, (unsigned long long)bio->bi_sector);
1355                                 err = -ENOSPC;
1356                                 goto fail;
1357                         }
1358                         goto next_bio;
1359                 }
1360                 ds -= len;
1361                 sector += len >> 9;
1362                 --nr_pages;
1363         }
1364         D_ASSERT(page == NULL);
1365         D_ASSERT(ds == 0);
1366
1367         atomic_set(&peer_req->pending_bios, n_bios);
1368         do {
1369                 bio = bios;
1370                 bios = bios->bi_next;
1371                 bio->bi_next = NULL;
1372
1373                 drbd_generic_make_request(mdev, fault_type, bio);
1374         } while (bios);
1375         return 0;
1376
1377 fail:
1378         while (bios) {
1379                 bio = bios;
1380                 bios = bios->bi_next;
1381                 bio_put(bio);
1382         }
1383         return err;
1384 }
1385
1386 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1387                                              struct drbd_peer_request *peer_req)
1388 {
1389         struct drbd_interval *i = &peer_req->i;
1390
1391         drbd_remove_interval(&mdev->write_requests, i);
1392         drbd_clear_interval(i);
1393
1394         /* Wake up any processes waiting for this peer request to complete.  */
1395         if (i->waiting)
1396                 wake_up(&mdev->misc_wait);
1397 }
1398
1399 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1400 {
1401         struct drbd_conf *mdev;
1402         int vnr;
1403
1404         rcu_read_lock();
1405         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1406                 kref_get(&mdev->kref);
1407                 rcu_read_unlock();
1408                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1409                 kref_put(&mdev->kref, &drbd_minor_destroy);
1410                 rcu_read_lock();
1411         }
1412         rcu_read_unlock();
1413 }
1414
1415 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1416 {
1417         int rv;
1418         struct p_barrier *p = pi->data;
1419         struct drbd_epoch *epoch;
1420
1421         /* FIXME these are unacked on connection,
1422          * not a specific (peer)device.
1423          */
1424         tconn->current_epoch->barrier_nr = p->barrier;
1425         tconn->current_epoch->tconn = tconn;
1426         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1427
1428         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1429          * the activity log, which means it would not be resynced in case the
1430          * R_PRIMARY crashes now.
1431          * Therefore we must send the barrier_ack after the barrier request was
1432          * completed. */
1433         switch (tconn->write_ordering) {
1434         case WO_none:
1435                 if (rv == FE_RECYCLED)
1436                         return 0;
1437
1438                 /* receiver context, in the writeout path of the other node.
1439                  * avoid potential distributed deadlock */
1440                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1441                 if (epoch)
1442                         break;
1443                 else
1444                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1445                         /* Fall through */
1446
1447         case WO_bdev_flush:
1448         case WO_drain_io:
1449                 conn_wait_active_ee_empty(tconn);
1450                 drbd_flush(tconn);
1451
1452                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1453                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1454                         if (epoch)
1455                                 break;
1456                 }
1457
1458                 return 0;
1459         default:
1460                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1461                 return -EIO;
1462         }
1463
1464         epoch->flags = 0;
1465         atomic_set(&epoch->epoch_size, 0);
1466         atomic_set(&epoch->active, 0);
1467
1468         spin_lock(&tconn->epoch_lock);
1469         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1470                 list_add(&epoch->list, &tconn->current_epoch->list);
1471                 tconn->current_epoch = epoch;
1472                 tconn->epochs++;
1473         } else {
1474                 /* The current_epoch got recycled while we allocated this one... */
1475                 kfree(epoch);
1476         }
1477         spin_unlock(&tconn->epoch_lock);
1478
1479         return 0;
1480 }
1481
1482 /* used from receive_RSDataReply (recv_resync_read)
1483  * and from receive_Data */
1484 static struct drbd_peer_request *
1485 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1486               int data_size) __must_hold(local)
1487 {
1488         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1489         struct drbd_peer_request *peer_req;
1490         struct page *page;
1491         int dgs, ds, err;
1492         void *dig_in = mdev->tconn->int_dig_in;
1493         void *dig_vv = mdev->tconn->int_dig_vv;
1494         unsigned long *data;
1495
1496         dgs = 0;
1497         if (mdev->tconn->peer_integrity_tfm) {
1498                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1499                 /*
1500                  * FIXME: Receive the incoming digest into the receive buffer
1501                  *        here, together with its struct p_data?
1502                  */
1503                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1504                 if (err)
1505                         return NULL;
1506                 data_size -= dgs;
1507         }
1508
1509         if (!expect(IS_ALIGNED(data_size, 512)))
1510                 return NULL;
1511         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1512                 return NULL;
1513
1514         /* even though we trust out peer,
1515          * we sometimes have to double check. */
1516         if (sector + (data_size>>9) > capacity) {
1517                 dev_err(DEV, "request from peer beyond end of local disk: "
1518                         "capacity: %llus < sector: %llus + size: %u\n",
1519                         (unsigned long long)capacity,
1520                         (unsigned long long)sector, data_size);
1521                 return NULL;
1522         }
1523
1524         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1525          * "criss-cross" setup, that might cause write-out on some other DRBD,
1526          * which in turn might block on the other node at this very place.  */
1527         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1528         if (!peer_req)
1529                 return NULL;
1530
1531         if (!data_size)
1532                 return peer_req;
1533
1534         ds = data_size;
1535         page = peer_req->pages;
1536         page_chain_for_each(page) {
1537                 unsigned len = min_t(int, ds, PAGE_SIZE);
1538                 data = kmap(page);
1539                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1540                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1541                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1542                         data[0] = data[0] ^ (unsigned long)-1;
1543                 }
1544                 kunmap(page);
1545                 if (err) {
1546                         drbd_free_peer_req(mdev, peer_req);
1547                         return NULL;
1548                 }
1549                 ds -= len;
1550         }
1551
1552         if (dgs) {
1553                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1554                 if (memcmp(dig_in, dig_vv, dgs)) {
1555                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1556                                 (unsigned long long)sector, data_size);
1557                         drbd_free_peer_req(mdev, peer_req);
1558                         return NULL;
1559                 }
1560         }
1561         mdev->recv_cnt += data_size>>9;
1562         return peer_req;
1563 }
1564
1565 /* drbd_drain_block() just takes a data block
1566  * out of the socket input buffer, and discards it.
1567  */
1568 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1569 {
1570         struct page *page;
1571         int err = 0;
1572         void *data;
1573
1574         if (!data_size)
1575                 return 0;
1576
1577         page = drbd_alloc_pages(mdev, 1, 1);
1578
1579         data = kmap(page);
1580         while (data_size) {
1581                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1582
1583                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1584                 if (err)
1585                         break;
1586                 data_size -= len;
1587         }
1588         kunmap(page);
1589         drbd_free_pages(mdev, page, 0);
1590         return err;
1591 }
1592
1593 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1594                            sector_t sector, int data_size)
1595 {
1596         struct bio_vec *bvec;
1597         struct bio *bio;
1598         int dgs, err, i, expect;
1599         void *dig_in = mdev->tconn->int_dig_in;
1600         void *dig_vv = mdev->tconn->int_dig_vv;
1601
1602         dgs = 0;
1603         if (mdev->tconn->peer_integrity_tfm) {
1604                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1605                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1606                 if (err)
1607                         return err;
1608                 data_size -= dgs;
1609         }
1610
1611         /* optimistically update recv_cnt.  if receiving fails below,
1612          * we disconnect anyways, and counters will be reset. */
1613         mdev->recv_cnt += data_size>>9;
1614
1615         bio = req->master_bio;
1616         D_ASSERT(sector == bio->bi_sector);
1617
1618         bio_for_each_segment(bvec, bio, i) {
1619                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1620                 expect = min_t(int, data_size, bvec->bv_len);
1621                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1622                 kunmap(bvec->bv_page);
1623                 if (err)
1624                         return err;
1625                 data_size -= expect;
1626         }
1627
1628         if (dgs) {
1629                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1630                 if (memcmp(dig_in, dig_vv, dgs)) {
1631                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1632                         return -EINVAL;
1633                 }
1634         }
1635
1636         D_ASSERT(data_size == 0);
1637         return 0;
1638 }
1639
1640 /*
1641  * e_end_resync_block() is called in asender context via
1642  * drbd_finish_peer_reqs().
1643  */
1644 static int e_end_resync_block(struct drbd_work *w, int unused)
1645 {
1646         struct drbd_peer_request *peer_req =
1647                 container_of(w, struct drbd_peer_request, w);
1648         struct drbd_conf *mdev = w->mdev;
1649         sector_t sector = peer_req->i.sector;
1650         int err;
1651
1652         D_ASSERT(drbd_interval_empty(&peer_req->i));
1653
1654         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1655                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1656                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1657         } else {
1658                 /* Record failure to sync */
1659                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1660
1661                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1662         }
1663         dec_unacked(mdev);
1664
1665         return err;
1666 }
1667
1668 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1669 {
1670         struct drbd_peer_request *peer_req;
1671
1672         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1673         if (!peer_req)
1674                 goto fail;
1675
1676         dec_rs_pending(mdev);
1677
1678         inc_unacked(mdev);
1679         /* corresponding dec_unacked() in e_end_resync_block()
1680          * respective _drbd_clear_done_ee */
1681
1682         peer_req->w.cb = e_end_resync_block;
1683
1684         spin_lock_irq(&mdev->tconn->req_lock);
1685         list_add(&peer_req->w.list, &mdev->sync_ee);
1686         spin_unlock_irq(&mdev->tconn->req_lock);
1687
1688         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1689         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1690                 return 0;
1691
1692         /* don't care for the reason here */
1693         dev_err(DEV, "submit failed, triggering re-connect\n");
1694         spin_lock_irq(&mdev->tconn->req_lock);
1695         list_del(&peer_req->w.list);
1696         spin_unlock_irq(&mdev->tconn->req_lock);
1697
1698         drbd_free_peer_req(mdev, peer_req);
1699 fail:
1700         put_ldev(mdev);
1701         return -EIO;
1702 }
1703
1704 static struct drbd_request *
1705 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1706              sector_t sector, bool missing_ok, const char *func)
1707 {
1708         struct drbd_request *req;
1709
1710         /* Request object according to our peer */
1711         req = (struct drbd_request *)(unsigned long)id;
1712         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1713                 return req;
1714         if (!missing_ok) {
1715                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1716                         (unsigned long)id, (unsigned long long)sector);
1717         }
1718         return NULL;
1719 }
1720
1721 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1722 {
1723         struct drbd_conf *mdev;
1724         struct drbd_request *req;
1725         sector_t sector;
1726         int err;
1727         struct p_data *p = pi->data;
1728
1729         mdev = vnr_to_mdev(tconn, pi->vnr);
1730         if (!mdev)
1731                 return -EIO;
1732
1733         sector = be64_to_cpu(p->sector);
1734
1735         spin_lock_irq(&mdev->tconn->req_lock);
1736         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1737         spin_unlock_irq(&mdev->tconn->req_lock);
1738         if (unlikely(!req))
1739                 return -EIO;
1740
1741         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1742          * special casing it there for the various failure cases.
1743          * still no race with drbd_fail_pending_reads */
1744         err = recv_dless_read(mdev, req, sector, pi->size);
1745         if (!err)
1746                 req_mod(req, DATA_RECEIVED);
1747         /* else: nothing. handled from drbd_disconnect...
1748          * I don't think we may complete this just yet
1749          * in case we are "on-disconnect: freeze" */
1750
1751         return err;
1752 }
1753
1754 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1755 {
1756         struct drbd_conf *mdev;
1757         sector_t sector;
1758         int err;
1759         struct p_data *p = pi->data;
1760
1761         mdev = vnr_to_mdev(tconn, pi->vnr);
1762         if (!mdev)
1763                 return -EIO;
1764
1765         sector = be64_to_cpu(p->sector);
1766         D_ASSERT(p->block_id == ID_SYNCER);
1767
1768         if (get_ldev(mdev)) {
1769                 /* data is submitted to disk within recv_resync_read.
1770                  * corresponding put_ldev done below on error,
1771                  * or in drbd_peer_request_endio. */
1772                 err = recv_resync_read(mdev, sector, pi->size);
1773         } else {
1774                 if (__ratelimit(&drbd_ratelimit_state))
1775                         dev_err(DEV, "Can not write resync data to local disk.\n");
1776
1777                 err = drbd_drain_block(mdev, pi->size);
1778
1779                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1780         }
1781
1782         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1783
1784         return err;
1785 }
1786
1787 static void restart_conflicting_writes(struct drbd_conf *mdev,
1788                                        sector_t sector, int size)
1789 {
1790         struct drbd_interval *i;
1791         struct drbd_request *req;
1792
1793         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1794                 if (!i->local)
1795                         continue;
1796                 req = container_of(i, struct drbd_request, i);
1797                 if (req->rq_state & RQ_LOCAL_PENDING ||
1798                     !(req->rq_state & RQ_POSTPONED))
1799                         continue;
1800                 /* as it is RQ_POSTPONED, this will cause it to
1801                  * be queued on the retry workqueue. */
1802                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1803         }
1804 }
1805
1806 /*
1807  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1808  */
1809 static int e_end_block(struct drbd_work *w, int cancel)
1810 {
1811         struct drbd_peer_request *peer_req =
1812                 container_of(w, struct drbd_peer_request, w);
1813         struct drbd_conf *mdev = w->mdev;
1814         sector_t sector = peer_req->i.sector;
1815         int err = 0, pcmd;
1816
1817         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1818                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1819                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1820                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1821                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1822                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1823                         err = drbd_send_ack(mdev, pcmd, peer_req);
1824                         if (pcmd == P_RS_WRITE_ACK)
1825                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1826                 } else {
1827                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1828                         /* we expect it to be marked out of sync anyways...
1829                          * maybe assert this?  */
1830                 }
1831                 dec_unacked(mdev);
1832         }
1833         /* we delete from the conflict detection hash _after_ we sent out the
1834          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1835         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1836                 spin_lock_irq(&mdev->tconn->req_lock);
1837                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1838                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1839                 if (peer_req->flags & EE_RESTART_REQUESTS)
1840                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1841                 spin_unlock_irq(&mdev->tconn->req_lock);
1842         } else
1843                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1844
1845         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1846
1847         return err;
1848 }
1849
1850 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1851 {
1852         struct drbd_conf *mdev = w->mdev;
1853         struct drbd_peer_request *peer_req =
1854                 container_of(w, struct drbd_peer_request, w);
1855         int err;
1856
1857         err = drbd_send_ack(mdev, ack, peer_req);
1858         dec_unacked(mdev);
1859
1860         return err;
1861 }
1862
1863 static int e_send_superseded(struct drbd_work *w, int unused)
1864 {
1865         return e_send_ack(w, P_SUPERSEDED);
1866 }
1867
1868 static int e_send_retry_write(struct drbd_work *w, int unused)
1869 {
1870         struct drbd_tconn *tconn = w->mdev->tconn;
1871
1872         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1873                              P_RETRY_WRITE : P_SUPERSEDED);
1874 }
1875
1876 static bool seq_greater(u32 a, u32 b)
1877 {
1878         /*
1879          * We assume 32-bit wrap-around here.
1880          * For 24-bit wrap-around, we would have to shift:
1881          *  a <<= 8; b <<= 8;
1882          */
1883         return (s32)a - (s32)b > 0;
1884 }
1885
1886 static u32 seq_max(u32 a, u32 b)
1887 {
1888         return seq_greater(a, b) ? a : b;
1889 }
1890
1891 static bool need_peer_seq(struct drbd_conf *mdev)
1892 {
1893         struct drbd_tconn *tconn = mdev->tconn;
1894         int tp;
1895
1896         /*
1897          * We only need to keep track of the last packet_seq number of our peer
1898          * if we are in dual-primary mode and we have the resolve-conflicts flag set; see
1899          * handle_write_conflicts().
1900          */
1901
1902         rcu_read_lock();
1903         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1904         rcu_read_unlock();
1905
1906         return tp && test_bit(RESOLVE_CONFLICTS, &tconn->flags);
1907 }
1908
1909 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1910 {
1911         unsigned int newest_peer_seq;
1912
1913         if (need_peer_seq(mdev)) {
1914                 spin_lock(&mdev->peer_seq_lock);
1915                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1916                 mdev->peer_seq = newest_peer_seq;
1917                 spin_unlock(&mdev->peer_seq_lock);
1918                 /* wake up only if we actually changed mdev->peer_seq */
1919                 if (peer_seq == newest_peer_seq)
1920                         wake_up(&mdev->seq_wait);
1921         }
1922 }
1923
1924 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1925 {
1926         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1927 }
1928
1929 /* maybe change sync_ee into interval trees as well? */
1930 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1931 {
1932         struct drbd_peer_request *rs_req;
1933         bool rv = 0;
1934
1935         spin_lock_irq(&mdev->tconn->req_lock);
1936         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1937                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1938                              rs_req->i.sector, rs_req->i.size)) {
1939                         rv = 1;
1940                         break;
1941                 }
1942         }
1943         spin_unlock_irq(&mdev->tconn->req_lock);
1944
1945         return rv;
1946 }
1947
1948 /* Called from receive_Data.
1949  * Synchronize packets on sock with packets on msock.
1950  *
1951  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1952  * packet traveling on msock, they are still processed in the order they have
1953  * been sent.
1954  *
1955  * Note: we don't care for Ack packets overtaking P_DATA packets.
1956  *
1957  * In case packet_seq is larger than mdev->peer_seq number, there are
1958  * outstanding packets on the msock. We wait for them to arrive.
1959  * In case we are the logically next packet, we update mdev->peer_seq
1960  * ourselves. Correctly handles 32bit wrap around.
1961  *
1962  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1963  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1964  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1965  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1966  *
1967  * returns 0 if we may process the packet,
1968  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1969 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1970 {
1971         DEFINE_WAIT(wait);
1972         long timeout;
1973         int ret;
1974
1975         if (!need_peer_seq(mdev))
1976                 return 0;
1977
1978         spin_lock(&mdev->peer_seq_lock);
1979         for (;;) {
1980                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1981                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1982                         ret = 0;
1983                         break;
1984                 }
1985                 if (signal_pending(current)) {
1986                         ret = -ERESTARTSYS;
1987                         break;
1988                 }
1989                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1990                 spin_unlock(&mdev->peer_seq_lock);
1991                 rcu_read_lock();
1992                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1993                 rcu_read_unlock();
1994                 timeout = schedule_timeout(timeout);
1995                 spin_lock(&mdev->peer_seq_lock);
1996                 if (!timeout) {
1997                         ret = -ETIMEDOUT;
1998                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1999                         break;
2000                 }
2001         }
2002         spin_unlock(&mdev->peer_seq_lock);
2003         finish_wait(&mdev->seq_wait, &wait);
2004         return ret;
2005 }
2006
2007 /* see also bio_flags_to_wire()
2008  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2009  * flags and back. We may replicate to other kernel versions. */
2010 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
2011 {
2012         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2013                 (dpf & DP_FUA ? REQ_FUA : 0) |
2014                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2015                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2016 }
2017
2018 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2019                                     unsigned int size)
2020 {
2021         struct drbd_interval *i;
2022
2023     repeat:
2024         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2025                 struct drbd_request *req;
2026                 struct bio_and_error m;
2027
2028                 if (!i->local)
2029                         continue;
2030                 req = container_of(i, struct drbd_request, i);
2031                 if (!(req->rq_state & RQ_POSTPONED))
2032                         continue;
2033                 req->rq_state &= ~RQ_POSTPONED;
2034                 __req_mod(req, NEG_ACKED, &m);
2035                 spin_unlock_irq(&mdev->tconn->req_lock);
2036                 if (m.bio)
2037                         complete_master_bio(mdev, &m);
2038                 spin_lock_irq(&mdev->tconn->req_lock);
2039                 goto repeat;
2040         }
2041 }
2042
2043 static int handle_write_conflicts(struct drbd_conf *mdev,
2044                                   struct drbd_peer_request *peer_req)
2045 {
2046         struct drbd_tconn *tconn = mdev->tconn;
2047         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &tconn->flags);
2048         sector_t sector = peer_req->i.sector;
2049         const unsigned int size = peer_req->i.size;
2050         struct drbd_interval *i;
2051         bool equal;
2052         int err;
2053
2054         /*
2055          * Inserting the peer request into the write_requests tree will prevent
2056          * new conflicting local requests from being added.
2057          */
2058         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2059
2060     repeat:
2061         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2062                 if (i == &peer_req->i)
2063                         continue;
2064
2065                 if (!i->local) {
2066                         /*
2067                          * Our peer has sent a conflicting remote request; this
2068                          * should not happen in a two-node setup.  Wait for the
2069                          * earlier peer request to complete.
2070                          */
2071                         err = drbd_wait_misc(mdev, i);
2072                         if (err)
2073                                 goto out;
2074                         goto repeat;
2075                 }
2076
2077                 equal = i->sector == sector && i->size == size;
2078                 if (resolve_conflicts) {
2079                         /*
2080                          * If the peer request is fully contained within the
2081                          * overlapping request, it can be considered overwritten
2082                          * and thus superseded; otherwise, it will be retried
2083                          * once all overlapping requests have completed.
2084                          */
2085                         bool superseded = i->sector <= sector && i->sector +
2086                                        (i->size >> 9) >= sector + (size >> 9);
2087
2088                         if (!equal)
2089                                 dev_alert(DEV, "Concurrent writes detected: "
2090                                                "local=%llus +%u, remote=%llus +%u, "
2091                                                "assuming %s came first\n",
2092                                           (unsigned long long)i->sector, i->size,
2093                                           (unsigned long long)sector, size,
2094                                           superseded ? "local" : "remote");
2095
2096                         inc_unacked(mdev);
2097                         peer_req->w.cb = superseded ? e_send_superseded :
2098                                                    e_send_retry_write;
2099                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2100                         wake_asender(mdev->tconn);
2101
2102                         err = -ENOENT;
2103                         goto out;
2104                 } else {
2105                         struct drbd_request *req =
2106                                 container_of(i, struct drbd_request, i);
2107
2108                         if (!equal)
2109                                 dev_alert(DEV, "Concurrent writes detected: "
2110                                                "local=%llus +%u, remote=%llus +%u\n",
2111                                           (unsigned long long)i->sector, i->size,
2112                                           (unsigned long long)sector, size);
2113
2114                         if (req->rq_state & RQ_LOCAL_PENDING ||
2115                             !(req->rq_state & RQ_POSTPONED)) {
2116                                 /*
2117                                  * Wait for the node with the discard flag to
2118                                  * decide if this request has been superseded
2119                                  * or needs to be retried.
2120                                  * Requests that have been superseded will
2121                                  * disappear from the write_requests tree.
2122                                  *
2123                                  * In addition, wait for the conflicting
2124                                  * request to finish locally before submitting
2125                                  * the conflicting peer request.
2126                                  */
2127                                 err = drbd_wait_misc(mdev, &req->i);
2128                                 if (err) {
2129                                         _conn_request_state(mdev->tconn,
2130                                                             NS(conn, C_TIMEOUT),
2131                                                             CS_HARD);
2132                                         fail_postponed_requests(mdev, sector, size);
2133                                         goto out;
2134                                 }
2135                                 goto repeat;
2136                         }
2137                         /*
2138                          * Remember to restart the conflicting requests after
2139                          * the new peer request has completed.
2140                          */
2141                         peer_req->flags |= EE_RESTART_REQUESTS;
2142                 }
2143         }
2144         err = 0;
2145
2146     out:
2147         if (err)
2148                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2149         return err;
2150 }
2151
2152 /* mirrored write */
2153 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2154 {
2155         struct drbd_conf *mdev;
2156         sector_t sector;
2157         struct drbd_peer_request *peer_req;
2158         struct p_data *p = pi->data;
2159         u32 peer_seq = be32_to_cpu(p->seq_num);
2160         int rw = WRITE;
2161         u32 dp_flags;
2162         int err, tp;
2163
2164         mdev = vnr_to_mdev(tconn, pi->vnr);
2165         if (!mdev)
2166                 return -EIO;
2167
2168         if (!get_ldev(mdev)) {
2169                 int err2;
2170
2171                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2172                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2173                 atomic_inc(&tconn->current_epoch->epoch_size);
2174                 err2 = drbd_drain_block(mdev, pi->size);
2175                 if (!err)
2176                         err = err2;
2177                 return err;
2178         }
2179
2180         /*
2181          * Corresponding put_ldev done either below (on various errors), or in
2182          * drbd_peer_request_endio, if we successfully submit the data at the
2183          * end of this function.
2184          */
2185
2186         sector = be64_to_cpu(p->sector);
2187         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2188         if (!peer_req) {
2189                 put_ldev(mdev);
2190                 return -EIO;
2191         }
2192
2193         peer_req->w.cb = e_end_block;
2194
2195         dp_flags = be32_to_cpu(p->dp_flags);
2196         rw |= wire_flags_to_bio(mdev, dp_flags);
2197         if (peer_req->pages == NULL) {
2198                 D_ASSERT(peer_req->i.size == 0);
2199                 D_ASSERT(dp_flags & DP_FLUSH);
2200         }
2201
2202         if (dp_flags & DP_MAY_SET_IN_SYNC)
2203                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2204
2205         spin_lock(&tconn->epoch_lock);
2206         peer_req->epoch = tconn->current_epoch;
2207         atomic_inc(&peer_req->epoch->epoch_size);
2208         atomic_inc(&peer_req->epoch->active);
2209         spin_unlock(&tconn->epoch_lock);
2210
2211         rcu_read_lock();
2212         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2213         rcu_read_unlock();
2214         if (tp) {
2215                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2216                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2217                 if (err)
2218                         goto out_interrupted;
2219                 spin_lock_irq(&mdev->tconn->req_lock);
2220                 err = handle_write_conflicts(mdev, peer_req);
2221                 if (err) {
2222                         spin_unlock_irq(&mdev->tconn->req_lock);
2223                         if (err == -ENOENT) {
2224                                 put_ldev(mdev);
2225                                 return 0;
2226                         }
2227                         goto out_interrupted;
2228                 }
2229         } else
2230                 spin_lock_irq(&mdev->tconn->req_lock);
2231         list_add(&peer_req->w.list, &mdev->active_ee);
2232         spin_unlock_irq(&mdev->tconn->req_lock);
2233
2234         if (mdev->state.conn == C_SYNC_TARGET)
2235                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2236
2237         if (mdev->tconn->agreed_pro_version < 100) {
2238                 rcu_read_lock();
2239                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2240                 case DRBD_PROT_C:
2241                         dp_flags |= DP_SEND_WRITE_ACK;
2242                         break;
2243                 case DRBD_PROT_B:
2244                         dp_flags |= DP_SEND_RECEIVE_ACK;
2245                         break;
2246                 }
2247                 rcu_read_unlock();
2248         }
2249
2250         if (dp_flags & DP_SEND_WRITE_ACK) {
2251                 peer_req->flags |= EE_SEND_WRITE_ACK;
2252                 inc_unacked(mdev);
2253                 /* corresponding dec_unacked() in e_end_block()
2254                  * respective _drbd_clear_done_ee */
2255         }
2256
2257         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2258                 /* I really don't like it that the receiver thread
2259                  * sends on the msock, but anyways */
2260                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2261         }
2262
2263         if (mdev->state.pdsk < D_INCONSISTENT) {
2264                 /* In case we have the only disk of the cluster, */
2265                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2266                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2267                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2268                 drbd_al_begin_io(mdev, &peer_req->i);
2269         }
2270
2271         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2272         if (!err)
2273                 return 0;
2274
2275         /* don't care for the reason here */
2276         dev_err(DEV, "submit failed, triggering re-connect\n");
2277         spin_lock_irq(&mdev->tconn->req_lock);
2278         list_del(&peer_req->w.list);
2279         drbd_remove_epoch_entry_interval(mdev, peer_req);
2280         spin_unlock_irq(&mdev->tconn->req_lock);
2281         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2282                 drbd_al_complete_io(mdev, &peer_req->i);
2283
2284 out_interrupted:
2285         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2286         put_ldev(mdev);
2287         drbd_free_peer_req(mdev, peer_req);
2288         return err;
2289 }
2290
2291 /* We may throttle resync, if the lower device seems to be busy,
2292  * and current sync rate is above c_min_rate.
2293  *
2294  * To decide whether or not the lower device is busy, we use a scheme similar
2295  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2296  * (more than 64 sectors) of activity we cannot account for with our own resync
2297  * activity, it obviously is "busy".
2298  *
2299  * The current sync rate used here uses only the most recent two step marks,
2300  * to have a short time average so we can react faster.
2301  */
2302 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2303 {
2304         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2305         unsigned long db, dt, dbdt;
2306         struct lc_element *tmp;
2307         int curr_events;
2308         int throttle = 0;
2309         unsigned int c_min_rate;
2310
2311         rcu_read_lock();
2312         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2313         rcu_read_unlock();
2314
2315         /* feature disabled? */
2316         if (c_min_rate == 0)
2317                 return 0;
2318
2319         spin_lock_irq(&mdev->al_lock);
2320         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2321         if (tmp) {
2322                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2323                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2324                         spin_unlock_irq(&mdev->al_lock);
2325                         return 0;
2326                 }
2327                 /* Do not slow down if app IO is already waiting for this extent */
2328         }
2329         spin_unlock_irq(&mdev->al_lock);
2330
2331         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2332                       (int)part_stat_read(&disk->part0, sectors[1]) -
2333                         atomic_read(&mdev->rs_sect_ev);
2334
2335         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2336                 unsigned long rs_left;
2337                 int i;
2338
2339                 mdev->rs_last_events = curr_events;
2340
2341                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2342                  * approx. */
2343                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2344
2345                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2346                         rs_left = mdev->ov_left;
2347                 else
2348                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2349
2350                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2351                 if (!dt)
2352                         dt++;
2353                 db = mdev->rs_mark_left[i] - rs_left;
2354                 dbdt = Bit2KB(db/dt);
2355
2356                 if (dbdt > c_min_rate)
2357                         throttle = 1;
2358         }
2359         return throttle;
2360 }
2361
2362
2363 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2364 {
2365         struct drbd_conf *mdev;
2366         sector_t sector;
2367         sector_t capacity;
2368         struct drbd_peer_request *peer_req;
2369         struct digest_info *di = NULL;
2370         int size, verb;
2371         unsigned int fault_type;
2372         struct p_block_req *p = pi->data;
2373
2374         mdev = vnr_to_mdev(tconn, pi->vnr);
2375         if (!mdev)
2376                 return -EIO;
2377         capacity = drbd_get_capacity(mdev->this_bdev);
2378
2379         sector = be64_to_cpu(p->sector);
2380         size   = be32_to_cpu(p->blksize);
2381
2382         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2383                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2384                                 (unsigned long long)sector, size);
2385                 return -EINVAL;
2386         }
2387         if (sector + (size>>9) > capacity) {
2388                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2389                                 (unsigned long long)sector, size);
2390                 return -EINVAL;
2391         }
2392
2393         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2394                 verb = 1;
2395                 switch (pi->cmd) {
2396                 case P_DATA_REQUEST:
2397                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2398                         break;
2399                 case P_RS_DATA_REQUEST:
2400                 case P_CSUM_RS_REQUEST:
2401                 case P_OV_REQUEST:
2402                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2403                         break;
2404                 case P_OV_REPLY:
2405                         verb = 0;
2406                         dec_rs_pending(mdev);
2407                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2408                         break;
2409                 default:
2410                         BUG();
2411                 }
2412                 if (verb && __ratelimit(&drbd_ratelimit_state))
2413                         dev_err(DEV, "Can not satisfy peer's read request, "
2414                             "no local data.\n");
2415
2416                 /* drain possibly payload */
2417                 return drbd_drain_block(mdev, pi->size);
2418         }
2419
2420         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2421          * "criss-cross" setup, that might cause write-out on some other DRBD,
2422          * which in turn might block on the other node at this very place.  */
2423         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2424         if (!peer_req) {
2425                 put_ldev(mdev);
2426                 return -ENOMEM;
2427         }
2428
2429         switch (pi->cmd) {
2430         case P_DATA_REQUEST:
2431                 peer_req->w.cb = w_e_end_data_req;
2432                 fault_type = DRBD_FAULT_DT_RD;
2433                 /* application IO, don't drbd_rs_begin_io */
2434                 goto submit;
2435
2436         case P_RS_DATA_REQUEST:
2437                 peer_req->w.cb = w_e_end_rsdata_req;
2438                 fault_type = DRBD_FAULT_RS_RD;
2439                 /* used in the sector offset progress display */
2440                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2441                 break;
2442
2443         case P_OV_REPLY:
2444         case P_CSUM_RS_REQUEST:
2445                 fault_type = DRBD_FAULT_RS_RD;
2446                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2447                 if (!di)
2448                         goto out_free_e;
2449
2450                 di->digest_size = pi->size;
2451                 di->digest = (((char *)di)+sizeof(struct digest_info));
2452
2453                 peer_req->digest = di;
2454                 peer_req->flags |= EE_HAS_DIGEST;
2455
2456                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2457                         goto out_free_e;
2458
2459                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2460                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2461                         peer_req->w.cb = w_e_end_csum_rs_req;
2462                         /* used in the sector offset progress display */
2463                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2464                 } else if (pi->cmd == P_OV_REPLY) {
2465                         /* track progress, we may need to throttle */
2466                         atomic_add(size >> 9, &mdev->rs_sect_in);
2467                         peer_req->w.cb = w_e_end_ov_reply;
2468                         dec_rs_pending(mdev);
2469                         /* drbd_rs_begin_io done when we sent this request,
2470                          * but accounting still needs to be done. */
2471                         goto submit_for_resync;
2472                 }
2473                 break;
2474
2475         case P_OV_REQUEST:
2476                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2477                     mdev->tconn->agreed_pro_version >= 90) {
2478                         unsigned long now = jiffies;
2479                         int i;
2480                         mdev->ov_start_sector = sector;
2481                         mdev->ov_position = sector;
2482                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2483                         mdev->rs_total = mdev->ov_left;
2484                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2485                                 mdev->rs_mark_left[i] = mdev->ov_left;
2486                                 mdev->rs_mark_time[i] = now;
2487                         }
2488                         dev_info(DEV, "Online Verify start sector: %llu\n",
2489                                         (unsigned long long)sector);
2490                 }
2491                 peer_req->w.cb = w_e_end_ov_req;
2492                 fault_type = DRBD_FAULT_RS_RD;
2493                 break;
2494
2495         default:
2496                 BUG();
2497         }
2498
2499         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2500          * wrt the receiver, but it is not as straightforward as it may seem.
2501          * Various places in the resync start and stop logic assume resync
2502          * requests are processed in order, requeuing this on the worker thread
2503          * introduces a bunch of new code for synchronization between threads.
2504          *
2505          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2506          * "forever", throttling after drbd_rs_begin_io will lock that extent
2507          * for application writes for the same time.  For now, just throttle
2508          * here, where the rest of the code expects the receiver to sleep for
2509          * a while, anyways.
2510          */
2511
2512         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2513          * this defers syncer requests for some time, before letting at least
2514          * on request through.  The resync controller on the receiving side
2515          * will adapt to the incoming rate accordingly.
2516          *
2517          * We cannot throttle here if remote is Primary/SyncTarget:
2518          * we would also throttle its application reads.
2519          * In that case, throttling is done on the SyncTarget only.
2520          */
2521         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2522                 schedule_timeout_uninterruptible(HZ/10);
2523         if (drbd_rs_begin_io(mdev, sector))
2524                 goto out_free_e;
2525
2526 submit_for_resync:
2527         atomic_add(size >> 9, &mdev->rs_sect_ev);
2528
2529 submit:
2530         inc_unacked(mdev);
2531         spin_lock_irq(&mdev->tconn->req_lock);
2532         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2533         spin_unlock_irq(&mdev->tconn->req_lock);
2534
2535         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2536                 return 0;
2537
2538         /* don't care for the reason here */
2539         dev_err(DEV, "submit failed, triggering re-connect\n");
2540         spin_lock_irq(&mdev->tconn->req_lock);
2541         list_del(&peer_req->w.list);
2542         spin_unlock_irq(&mdev->tconn->req_lock);
2543         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2544
2545 out_free_e:
2546         put_ldev(mdev);
2547         drbd_free_peer_req(mdev, peer_req);
2548         return -EIO;
2549 }
2550
2551 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2552 {
2553         int self, peer, rv = -100;
2554         unsigned long ch_self, ch_peer;
2555         enum drbd_after_sb_p after_sb_0p;
2556
2557         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2558         peer = mdev->p_uuid[UI_BITMAP] & 1;
2559
2560         ch_peer = mdev->p_uuid[UI_SIZE];
2561         ch_self = mdev->comm_bm_set;
2562
2563         rcu_read_lock();
2564         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2565         rcu_read_unlock();
2566         switch (after_sb_0p) {
2567         case ASB_CONSENSUS:
2568         case ASB_DISCARD_SECONDARY:
2569         case ASB_CALL_HELPER:
2570         case ASB_VIOLENTLY:
2571                 dev_err(DEV, "Configuration error.\n");
2572                 break;
2573         case ASB_DISCONNECT:
2574                 break;
2575         case ASB_DISCARD_YOUNGER_PRI:
2576                 if (self == 0 && peer == 1) {
2577                         rv = -1;
2578                         break;
2579                 }
2580                 if (self == 1 && peer == 0) {
2581                         rv =  1;
2582                         break;
2583                 }
2584                 /* Else fall through to one of the other strategies... */
2585         case ASB_DISCARD_OLDER_PRI:
2586                 if (self == 0 && peer == 1) {
2587                         rv = 1;
2588                         break;
2589                 }
2590                 if (self == 1 && peer == 0) {
2591                         rv = -1;
2592                         break;
2593                 }
2594                 /* Else fall through to one of the other strategies... */
2595                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2596                      "Using discard-least-changes instead\n");
2597         case ASB_DISCARD_ZERO_CHG:
2598                 if (ch_peer == 0 && ch_self == 0) {
2599                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2600                                 ? -1 : 1;
2601                         break;
2602                 } else {
2603                         if (ch_peer == 0) { rv =  1; break; }
2604                         if (ch_self == 0) { rv = -1; break; }
2605                 }
2606                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2607                         break;
2608         case ASB_DISCARD_LEAST_CHG:
2609                 if      (ch_self < ch_peer)
2610                         rv = -1;
2611                 else if (ch_self > ch_peer)
2612                         rv =  1;
2613                 else /* ( ch_self == ch_peer ) */
2614                      /* Well, then use something else. */
2615                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2616                                 ? -1 : 1;
2617                 break;
2618         case ASB_DISCARD_LOCAL:
2619                 rv = -1;
2620                 break;
2621         case ASB_DISCARD_REMOTE:
2622                 rv =  1;
2623         }
2624
2625         return rv;
2626 }
2627
2628 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2629 {
2630         int hg, rv = -100;
2631         enum drbd_after_sb_p after_sb_1p;
2632
2633         rcu_read_lock();
2634         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2635         rcu_read_unlock();
2636         switch (after_sb_1p) {
2637         case ASB_DISCARD_YOUNGER_PRI:
2638         case ASB_DISCARD_OLDER_PRI:
2639         case ASB_DISCARD_LEAST_CHG:
2640         case ASB_DISCARD_LOCAL:
2641         case ASB_DISCARD_REMOTE:
2642         case ASB_DISCARD_ZERO_CHG:
2643                 dev_err(DEV, "Configuration error.\n");
2644                 break;
2645         case ASB_DISCONNECT:
2646                 break;
2647         case ASB_CONSENSUS:
2648                 hg = drbd_asb_recover_0p(mdev);
2649                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2650                         rv = hg;
2651                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2652                         rv = hg;
2653                 break;
2654         case ASB_VIOLENTLY:
2655                 rv = drbd_asb_recover_0p(mdev);
2656                 break;
2657         case ASB_DISCARD_SECONDARY:
2658                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2659         case ASB_CALL_HELPER:
2660                 hg = drbd_asb_recover_0p(mdev);
2661                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2662                         enum drbd_state_rv rv2;
2663
2664                         drbd_set_role(mdev, R_SECONDARY, 0);
2665                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2666                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2667                           * we do not need to wait for the after state change work either. */
2668                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2669                         if (rv2 != SS_SUCCESS) {
2670                                 drbd_khelper(mdev, "pri-lost-after-sb");
2671                         } else {
2672                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2673                                 rv = hg;
2674                         }
2675                 } else
2676                         rv = hg;
2677         }
2678
2679         return rv;
2680 }
2681
2682 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2683 {
2684         int hg, rv = -100;
2685         enum drbd_after_sb_p after_sb_2p;
2686
2687         rcu_read_lock();
2688         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2689         rcu_read_unlock();
2690         switch (after_sb_2p) {
2691         case ASB_DISCARD_YOUNGER_PRI:
2692         case ASB_DISCARD_OLDER_PRI:
2693         case ASB_DISCARD_LEAST_CHG:
2694         case ASB_DISCARD_LOCAL:
2695         case ASB_DISCARD_REMOTE:
2696         case ASB_CONSENSUS:
2697         case ASB_DISCARD_SECONDARY:
2698         case ASB_DISCARD_ZERO_CHG:
2699                 dev_err(DEV, "Configuration error.\n");
2700                 break;
2701         case ASB_VIOLENTLY:
2702                 rv = drbd_asb_recover_0p(mdev);
2703                 break;
2704         case ASB_DISCONNECT:
2705                 break;
2706         case ASB_CALL_HELPER:
2707                 hg = drbd_asb_recover_0p(mdev);
2708                 if (hg == -1) {
2709                         enum drbd_state_rv rv2;
2710
2711                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2712                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2713                           * we do not need to wait for the after state change work either. */
2714                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2715                         if (rv2 != SS_SUCCESS) {
2716                                 drbd_khelper(mdev, "pri-lost-after-sb");
2717                         } else {
2718                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2719                                 rv = hg;
2720                         }
2721                 } else
2722                         rv = hg;
2723         }
2724
2725         return rv;
2726 }
2727
2728 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2729                            u64 bits, u64 flags)
2730 {
2731         if (!uuid) {
2732                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2733                 return;
2734         }
2735         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2736              text,
2737              (unsigned long long)uuid[UI_CURRENT],
2738              (unsigned long long)uuid[UI_BITMAP],
2739              (unsigned long long)uuid[UI_HISTORY_START],
2740              (unsigned long long)uuid[UI_HISTORY_END],
2741              (unsigned long long)bits,
2742              (unsigned long long)flags);
2743 }
2744
2745 /*
2746   100   after split brain try auto recover
2747     2   C_SYNC_SOURCE set BitMap
2748     1   C_SYNC_SOURCE use BitMap
2749     0   no Sync
2750    -1   C_SYNC_TARGET use BitMap
2751    -2   C_SYNC_TARGET set BitMap
2752  -100   after split brain, disconnect
2753 -1000   unrelated data
2754 -1091   requires proto 91
2755 -1096   requires proto 96
2756  */
2757 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2758 {
2759         u64 self, peer;
2760         int i, j;
2761
2762         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2763         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2764
2765         *rule_nr = 10;
2766         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2767                 return 0;
2768
2769         *rule_nr = 20;
2770         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2771              peer != UUID_JUST_CREATED)
2772                 return -2;
2773
2774         *rule_nr = 30;
2775         if (self != UUID_JUST_CREATED &&
2776             (peer == UUID_JUST_CREATED || peer == (u64)0))
2777                 return 2;
2778
2779         if (self == peer) {
2780                 int rct, dc; /* roles at crash time */
2781
2782                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2783
2784                         if (mdev->tconn->agreed_pro_version < 91)
2785                                 return -1091;
2786
2787                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2788                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2789                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2790                                 drbd_uuid_move_history(mdev);
2791                                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2792                                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2793
2794                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2795                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2796                                 *rule_nr = 34;
2797                         } else {
2798                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2799                                 *rule_nr = 36;
2800                         }
2801
2802                         return 1;
2803                 }
2804
2805                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2806
2807                         if (mdev->tconn->agreed_pro_version < 91)
2808                                 return -1091;
2809
2810                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2811                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2812                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2813
2814                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2815                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2816                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2817
2818                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2819                                 *rule_nr = 35;
2820                         } else {
2821                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2822                                 *rule_nr = 37;
2823                         }
2824
2825                         return -1;
2826                 }
2827
2828                 /* Common power [off|failure] */
2829                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2830                         (mdev->p_uuid[UI_FLAGS] & 2);
2831                 /* lowest bit is set when we were primary,
2832                  * next bit (weight 2) is set when peer was primary */
2833                 *rule_nr = 40;
2834
2835                 switch (rct) {
2836                 case 0: /* !self_pri && !peer_pri */ return 0;
2837                 case 1: /*  self_pri && !peer_pri */ return 1;
2838                 case 2: /* !self_pri &&  peer_pri */ return -1;
2839                 case 3: /*  self_pri &&  peer_pri */
2840                         dc = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags);
2841                         return dc ? -1 : 1;
2842                 }
2843         }
2844
2845         *rule_nr = 50;
2846         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2847         if (self == peer)
2848                 return -1;
2849
2850         *rule_nr = 51;
2851         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2852         if (self == peer) {
2853                 if (mdev->tconn->agreed_pro_version < 96 ?
2854                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2855                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2856                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2857                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2858                            resync as sync source modifications of the peer's UUIDs. */
2859
2860                         if (mdev->tconn->agreed_pro_version < 91)
2861                                 return -1091;
2862
2863                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2864                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2865
2866                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2867                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2868
2869                         return -1;
2870                 }
2871         }
2872
2873         *rule_nr = 60;
2874         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2875         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2876                 peer = mdev->p_uuid[i] & ~((u64)1);
2877                 if (self == peer)
2878                         return -2;
2879         }
2880
2881         *rule_nr = 70;
2882         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2883         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2884         if (self == peer)
2885                 return 1;
2886
2887         *rule_nr = 71;
2888         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2889         if (self == peer) {
2890                 if (mdev->tconn->agreed_pro_version < 96 ?
2891                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2892                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2893                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2894                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2895                            resync as sync source modifications of our UUIDs. */
2896
2897                         if (mdev->tconn->agreed_pro_version < 91)
2898                                 return -1091;
2899
2900                         __drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2901                         __drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2902
2903                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2904                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2905                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2906
2907                         return 1;
2908                 }
2909         }
2910
2911
2912         *rule_nr = 80;
2913         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2914         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2915                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2916                 if (self == peer)
2917                         return 2;
2918         }
2919
2920         *rule_nr = 90;
2921         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2922         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2923         if (self == peer && self != ((u64)0))
2924                 return 100;
2925
2926         *rule_nr = 100;
2927         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2928                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2929                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2930                         peer = mdev->p_uuid[j] & ~((u64)1);
2931                         if (self == peer)
2932                                 return -100;
2933                 }
2934         }
2935
2936         return -1000;
2937 }
2938
2939 /* drbd_sync_handshake() returns the new conn state on success, or
2940    CONN_MASK (-1) on failure.
2941  */
2942 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2943                                            enum drbd_disk_state peer_disk) __must_hold(local)
2944 {
2945         enum drbd_conns rv = C_MASK;
2946         enum drbd_disk_state mydisk;
2947         struct net_conf *nc;
2948         int hg, rule_nr, rr_conflict, tentative;
2949
2950         mydisk = mdev->state.disk;
2951         if (mydisk == D_NEGOTIATING)
2952                 mydisk = mdev->new_state_tmp.disk;
2953
2954         dev_info(DEV, "drbd_sync_handshake:\n");
2955
2956         spin_lock_irq(&mdev->ldev->md.uuid_lock);
2957         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2958         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2959                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2960
2961         hg = drbd_uuid_compare(mdev, &rule_nr);
2962         spin_unlock_irq(&mdev->ldev->md.uuid_lock);
2963
2964         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2965
2966         if (hg == -1000) {
2967                 dev_alert(DEV, "Unrelated data, aborting!\n");
2968                 return C_MASK;
2969         }
2970         if (hg < -1000) {
2971                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2972                 return C_MASK;
2973         }
2974
2975         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2976             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2977                 int f = (hg == -100) || abs(hg) == 2;
2978                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2979                 if (f)
2980                         hg = hg*2;
2981                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2982                      hg > 0 ? "source" : "target");
2983         }
2984
2985         if (abs(hg) == 100)
2986                 drbd_khelper(mdev, "initial-split-brain");
2987
2988         rcu_read_lock();
2989         nc = rcu_dereference(mdev->tconn->net_conf);
2990
2991         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2992                 int pcount = (mdev->state.role == R_PRIMARY)
2993                            + (peer_role == R_PRIMARY);
2994                 int forced = (hg == -100);
2995
2996                 switch (pcount) {
2997                 case 0:
2998                         hg = drbd_asb_recover_0p(mdev);
2999                         break;
3000                 case 1:
3001                         hg = drbd_asb_recover_1p(mdev);
3002                         break;
3003                 case 2:
3004                         hg = drbd_asb_recover_2p(mdev);
3005                         break;
3006                 }
3007                 if (abs(hg) < 100) {
3008                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
3009                              "automatically solved. Sync from %s node\n",
3010                              pcount, (hg < 0) ? "peer" : "this");
3011                         if (forced) {
3012                                 dev_warn(DEV, "Doing a full sync, since"
3013                                      " UUIDs where ambiguous.\n");
3014                                 hg = hg*2;
3015                         }
3016                 }
3017         }
3018
3019         if (hg == -100) {
3020                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3021                         hg = -1;
3022                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3023                         hg = 1;
3024
3025                 if (abs(hg) < 100)
3026                         dev_warn(DEV, "Split-Brain detected, manually solved. "
3027                              "Sync from %s node\n",
3028                              (hg < 0) ? "peer" : "this");
3029         }
3030         rr_conflict = nc->rr_conflict;
3031         tentative = nc->tentative;
3032         rcu_read_unlock();
3033
3034         if (hg == -100) {
3035                 /* FIXME this log message is not correct if we end up here
3036                  * after an attempted attach on a diskless node.
3037                  * We just refuse to attach -- well, we drop the "connection"
3038                  * to that disk, in a way... */
3039                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3040                 drbd_khelper(mdev, "split-brain");
3041                 return C_MASK;
3042         }
3043
3044         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3045                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3046                 return C_MASK;
3047         }
3048
3049         if (hg < 0 && /* by intention we do not use mydisk here. */
3050             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3051                 switch (rr_conflict) {
3052                 case ASB_CALL_HELPER:
3053                         drbd_khelper(mdev, "pri-lost");
3054                         /* fall through */
3055                 case ASB_DISCONNECT:
3056                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3057                         return C_MASK;
3058                 case ASB_VIOLENTLY:
3059                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3060                              "assumption\n");
3061                 }
3062         }
3063
3064         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3065                 if (hg == 0)
3066                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3067                 else
3068                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3069                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3070                                  abs(hg) >= 2 ? "full" : "bit-map based");
3071                 return C_MASK;
3072         }
3073
3074         if (abs(hg) >= 2) {
3075                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3076                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3077                                         BM_LOCKED_SET_ALLOWED))
3078                         return C_MASK;
3079         }
3080
3081         if (hg > 0) { /* become sync source. */
3082                 rv = C_WF_BITMAP_S;
3083         } else if (hg < 0) { /* become sync target */
3084                 rv = C_WF_BITMAP_T;
3085         } else {
3086                 rv = C_CONNECTED;
3087                 if (drbd_bm_total_weight(mdev)) {
3088                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3089                              drbd_bm_total_weight(mdev));
3090                 }
3091         }
3092
3093         return rv;
3094 }
3095
3096 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3097 {
3098         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3099         if (peer == ASB_DISCARD_REMOTE)
3100                 return ASB_DISCARD_LOCAL;
3101
3102         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3103         if (peer == ASB_DISCARD_LOCAL)
3104                 return ASB_DISCARD_REMOTE;
3105
3106         /* everything else is valid if they are equal on both sides. */
3107         return peer;
3108 }
3109
3110 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3111 {
3112         struct p_protocol *p = pi->data;
3113         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3114         int p_proto, p_discard_my_data, p_two_primaries, cf;
3115         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3116         char integrity_alg[SHARED_SECRET_MAX] = "";
3117         struct crypto_hash *peer_integrity_tfm = NULL;
3118         void *int_dig_in = NULL, *int_dig_vv = NULL;
3119
3120         p_proto         = be32_to_cpu(p->protocol);
3121         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3122         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3123         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3124         p_two_primaries = be32_to_cpu(p->two_primaries);
3125         cf              = be32_to_cpu(p->conn_flags);
3126         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3127
3128         if (tconn->agreed_pro_version >= 87) {
3129                 int err;
3130
3131                 if (pi->size > sizeof(integrity_alg))
3132                         return -EIO;
3133                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3134                 if (err)
3135                         return err;
3136                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3137         }
3138
3139         if (pi->cmd != P_PROTOCOL_UPDATE) {
3140                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3141
3142                 if (cf & CF_DRY_RUN)
3143                         set_bit(CONN_DRY_RUN, &tconn->flags);
3144
3145                 rcu_read_lock();
3146                 nc = rcu_dereference(tconn->net_conf);
3147
3148                 if (p_proto != nc->wire_protocol) {
3149                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3150                         goto disconnect_rcu_unlock;
3151                 }
3152
3153                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3154                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3155                         goto disconnect_rcu_unlock;
3156                 }
3157
3158                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3159                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3160                         goto disconnect_rcu_unlock;
3161                 }
3162
3163                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3164                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3165                         goto disconnect_rcu_unlock;
3166                 }
3167
3168                 if (p_discard_my_data && nc->discard_my_data) {
3169                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3170                         goto disconnect_rcu_unlock;
3171                 }
3172
3173                 if (p_two_primaries != nc->two_primaries) {
3174                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3175                         goto disconnect_rcu_unlock;
3176                 }
3177
3178                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3179                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3180                         goto disconnect_rcu_unlock;
3181                 }
3182
3183                 rcu_read_unlock();
3184         }
3185
3186         if (integrity_alg[0]) {
3187                 int hash_size;
3188
3189                 /*
3190                  * We can only change the peer data integrity algorithm
3191                  * here.  Changing our own data integrity algorithm
3192                  * requires that we send a P_PROTOCOL_UPDATE packet at
3193                  * the same time; otherwise, the peer has no way to
3194                  * tell between which packets the algorithm should
3195                  * change.
3196                  */
3197
3198                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3199                 if (!peer_integrity_tfm) {
3200                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3201                                  integrity_alg);
3202                         goto disconnect;
3203                 }
3204
3205                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3206                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3207                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3208                 if (!(int_dig_in && int_dig_vv)) {
3209                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3210                         goto disconnect;
3211                 }
3212         }
3213
3214         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3215         if (!new_net_conf) {
3216                 conn_err(tconn, "Allocation of new net_conf failed\n");
3217                 goto disconnect;
3218         }
3219
3220         mutex_lock(&tconn->data.mutex);
3221         mutex_lock(&tconn->conf_update);
3222         old_net_conf = tconn->net_conf;
3223         *new_net_conf = *old_net_conf;
3224
3225         new_net_conf->wire_protocol = p_proto;
3226         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3227         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3228         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3229         new_net_conf->two_primaries = p_two_primaries;
3230
3231         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3232         mutex_unlock(&tconn->conf_update);
3233         mutex_unlock(&tconn->data.mutex);
3234
3235         crypto_free_hash(tconn->peer_integrity_tfm);
3236         kfree(tconn->int_dig_in);
3237         kfree(tconn->int_dig_vv);
3238         tconn->peer_integrity_tfm = peer_integrity_tfm;
3239         tconn->int_dig_in = int_dig_in;
3240         tconn->int_dig_vv = int_dig_vv;
3241
3242         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3243                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3244                           integrity_alg[0] ? integrity_alg : "(none)");
3245
3246         synchronize_rcu();
3247         kfree(old_net_conf);
3248         return 0;
3249
3250 disconnect_rcu_unlock:
3251         rcu_read_unlock();
3252 disconnect:
3253         crypto_free_hash(peer_integrity_tfm);
3254         kfree(int_dig_in);
3255         kfree(int_dig_vv);
3256         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3257         return -EIO;
3258 }
3259
3260 /* helper function
3261  * input: alg name, feature name
3262  * return: NULL (alg name was "")
3263  *         ERR_PTR(error) if something goes wrong
3264  *         or the crypto hash ptr, if it worked out ok. */
3265 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3266                 const char *alg, const char *name)
3267 {
3268         struct crypto_hash *tfm;
3269
3270         if (!alg[0])
3271                 return NULL;
3272
3273         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3274         if (IS_ERR(tfm)) {
3275                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3276                         alg, name, PTR_ERR(tfm));
3277                 return tfm;
3278         }
3279         return tfm;
3280 }
3281
3282 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3283 {
3284         void *buffer = tconn->data.rbuf;
3285         int size = pi->size;
3286
3287         while (size) {
3288                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3289                 s = drbd_recv(tconn, buffer, s);
3290                 if (s <= 0) {
3291                         if (s < 0)
3292                                 return s;
3293                         break;
3294                 }
3295                 size -= s;
3296         }
3297         if (size)
3298                 return -EIO;
3299         return 0;
3300 }
3301
3302 /*
3303  * config_unknown_volume  -  device configuration command for unknown volume
3304  *
3305  * When a device is added to an existing connection, the node on which the
3306  * device is added first will send configuration commands to its peer but the
3307  * peer will not know about the device yet.  It will warn and ignore these
3308  * commands.  Once the device is added on the second node, the second node will
3309  * send the same device configuration commands, but in the other direction.
3310  *
3311  * (We can also end up here if drbd is misconfigured.)
3312  */
3313 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3314 {
3315         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3316                   cmdname(pi->cmd), pi->vnr);
3317         return ignore_remaining_packet(tconn, pi);
3318 }
3319
3320 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3321 {
3322         struct drbd_conf *mdev;
3323         struct p_rs_param_95 *p;
3324         unsigned int header_size, data_size, exp_max_sz;
3325         struct crypto_hash *verify_tfm = NULL;
3326         struct crypto_hash *csums_tfm = NULL;
3327         struct net_conf *old_net_conf, *new_net_conf = NULL;
3328         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3329         const int apv = tconn->agreed_pro_version;
3330         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3331         int fifo_size = 0;
3332         int err;
3333
3334         mdev = vnr_to_mdev(tconn, pi->vnr);
3335         if (!mdev)
3336                 return config_unknown_volume(tconn, pi);
3337
3338         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3339                     : apv == 88 ? sizeof(struct p_rs_param)
3340                                         + SHARED_SECRET_MAX
3341                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3342                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3343
3344         if (pi->size > exp_max_sz) {
3345                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3346                     pi->size, exp_max_sz);
3347                 return -EIO;
3348         }
3349
3350         if (apv <= 88) {
3351                 header_size = sizeof(struct p_rs_param);
3352                 data_size = pi->size - header_size;
3353         } else if (apv <= 94) {
3354                 header_size = sizeof(struct p_rs_param_89);
3355                 data_size = pi->size - header_size;
3356                 D_ASSERT(data_size == 0);
3357         } else {
3358                 header_size = sizeof(struct p_rs_param_95);
3359                 data_size = pi->size - header_size;
3360                 D_ASSERT(data_size == 0);
3361         }
3362
3363         /* initialize verify_alg and csums_alg */
3364         p = pi->data;
3365         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3366
3367         err = drbd_recv_all(mdev->tconn, p, header_size);
3368         if (err)
3369                 return err;
3370
3371         mutex_lock(&mdev->tconn->conf_update);
3372         old_net_conf = mdev->tconn->net_conf;
3373         if (get_ldev(mdev)) {
3374                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3375                 if (!new_disk_conf) {
3376                         put_ldev(mdev);
3377                         mutex_unlock(&mdev->tconn->conf_update);
3378                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3379                         return -ENOMEM;
3380                 }
3381
3382                 old_disk_conf = mdev->ldev->disk_conf;
3383                 *new_disk_conf = *old_disk_conf;
3384
3385                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3386         }
3387
3388         if (apv >= 88) {
3389                 if (apv == 88) {
3390                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3391                                 dev_err(DEV, "verify-alg of wrong size, "
3392                                         "peer wants %u, accepting only up to %u byte\n",
3393                                         data_size, SHARED_SECRET_MAX);
3394                                 err = -EIO;
3395                                 goto reconnect;
3396                         }
3397
3398                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3399                         if (err)
3400                                 goto reconnect;
3401                         /* we expect NUL terminated string */
3402                         /* but just in case someone tries to be evil */
3403                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3404                         p->verify_alg[data_size-1] = 0;
3405
3406                 } else /* apv >= 89 */ {
3407                         /* we still expect NUL terminated strings */
3408                         /* but just in case someone tries to be evil */
3409                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3410                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3411                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3412                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3413                 }
3414
3415                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3416                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3417                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3418                                     old_net_conf->verify_alg, p->verify_alg);
3419                                 goto disconnect;
3420                         }
3421                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3422                                         p->verify_alg, "verify-alg");
3423                         if (IS_ERR(verify_tfm)) {
3424                                 verify_tfm = NULL;
3425                                 goto disconnect;
3426                         }
3427                 }
3428
3429                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3430                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3431                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3432                                     old_net_conf->csums_alg, p->csums_alg);
3433                                 goto disconnect;
3434                         }
3435                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3436                                         p->csums_alg, "csums-alg");
3437                         if (IS_ERR(csums_tfm)) {
3438                                 csums_tfm = NULL;
3439                                 goto disconnect;
3440                         }
3441                 }
3442
3443                 if (apv > 94 && new_disk_conf) {
3444                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3445                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3446                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3447                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3448
3449                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3450                         if (fifo_size != mdev->rs_plan_s->size) {
3451                                 new_plan = fifo_alloc(fifo_size);
3452                                 if (!new_plan) {
3453                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3454                                         put_ldev(mdev);
3455                                         goto disconnect;
3456                                 }
3457                         }
3458                 }
3459
3460                 if (verify_tfm || csums_tfm) {
3461                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3462                         if (!new_net_conf) {
3463                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3464                                 goto disconnect;
3465                         }
3466
3467                         *new_net_conf = *old_net_conf;
3468
3469                         if (verify_tfm) {
3470                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3471                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3472                                 crypto_free_hash(mdev->tconn->verify_tfm);
3473                                 mdev->tconn->verify_tfm = verify_tfm;
3474                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3475                         }
3476                         if (csums_tfm) {
3477                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3478                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3479                                 crypto_free_hash(mdev->tconn->csums_tfm);
3480                                 mdev->tconn->csums_tfm = csums_tfm;
3481                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3482                         }
3483                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3484                 }
3485         }
3486
3487         if (new_disk_conf) {
3488                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3489                 put_ldev(mdev);
3490         }
3491
3492         if (new_plan) {
3493                 old_plan = mdev->rs_plan_s;
3494                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3495         }
3496
3497         mutex_unlock(&mdev->tconn->conf_update);
3498         synchronize_rcu();
3499         if (new_net_conf)
3500                 kfree(old_net_conf);
3501         kfree(old_disk_conf);
3502         kfree(old_plan);
3503
3504         return 0;
3505
3506 reconnect:
3507         if (new_disk_conf) {
3508                 put_ldev(mdev);
3509                 kfree(new_disk_conf);
3510         }
3511         mutex_unlock(&mdev->tconn->conf_update);
3512         return -EIO;
3513
3514 disconnect:
3515         kfree(new_plan);
3516         if (new_disk_conf) {
3517                 put_ldev(mdev);
3518                 kfree(new_disk_conf);
3519         }
3520         mutex_unlock(&mdev->tconn->conf_update);
3521         /* just for completeness: actually not needed,
3522          * as this is not reached if csums_tfm was ok. */
3523         crypto_free_hash(csums_tfm);
3524         /* but free the verify_tfm again, if csums_tfm did not work out */
3525         crypto_free_hash(verify_tfm);
3526         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3527         return -EIO;
3528 }
3529
3530 /* warn if the arguments differ by more than 12.5% */
3531 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3532         const char *s, sector_t a, sector_t b)
3533 {
3534         sector_t d;
3535         if (a == 0 || b == 0)
3536                 return;
3537         d = (a > b) ? (a - b) : (b - a);
3538         if (d > (a>>3) || d > (b>>3))
3539                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3540                      (unsigned long long)a, (unsigned long long)b);
3541 }
3542
3543 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3544 {
3545         struct drbd_conf *mdev;
3546         struct p_sizes *p = pi->data;
3547         enum determine_dev_size dd = unchanged;
3548         sector_t p_size, p_usize, my_usize;
3549         int ldsc = 0; /* local disk size changed */
3550         enum dds_flags ddsf;
3551
3552         mdev = vnr_to_mdev(tconn, pi->vnr);
3553         if (!mdev)
3554                 return config_unknown_volume(tconn, pi);
3555
3556         p_size = be64_to_cpu(p->d_size);
3557         p_usize = be64_to_cpu(p->u_size);
3558
3559         /* just store the peer's disk size for now.
3560          * we still need to figure out whether we accept that. */
3561         mdev->p_size = p_size;
3562
3563         if (get_ldev(mdev)) {
3564                 rcu_read_lock();
3565                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3566                 rcu_read_unlock();
3567
3568                 warn_if_differ_considerably(mdev, "lower level device sizes",
3569                            p_size, drbd_get_max_capacity(mdev->ldev));
3570                 warn_if_differ_considerably(mdev, "user requested size",
3571                                             p_usize, my_usize);
3572
3573                 /* if this is the first connect, or an otherwise expected
3574                  * param exchange, choose the minimum */
3575                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3576                         p_usize = min_not_zero(my_usize, p_usize);
3577
3578                 /* Never shrink a device with usable data during connect.
3579                    But allow online shrinking if we are connected. */
3580                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3581                     drbd_get_capacity(mdev->this_bdev) &&
3582                     mdev->state.disk >= D_OUTDATED &&
3583                     mdev->state.conn < C_CONNECTED) {
3584                         dev_err(DEV, "The peer's disk size is too small!\n");
3585                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3586                         put_ldev(mdev);
3587                         return -EIO;
3588                 }
3589
3590                 if (my_usize != p_usize) {
3591                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3592
3593                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3594                         if (!new_disk_conf) {
3595                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3596                                 put_ldev(mdev);
3597                                 return -ENOMEM;
3598                         }
3599
3600                         mutex_lock(&mdev->tconn->conf_update);
3601                         old_disk_conf = mdev->ldev->disk_conf;
3602                         *new_disk_conf = *old_disk_conf;
3603                         new_disk_conf->disk_size = p_usize;
3604
3605                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3606                         mutex_unlock(&mdev->tconn->conf_update);
3607                         synchronize_rcu();
3608                         kfree(old_disk_conf);
3609
3610                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3611                                  (unsigned long)my_usize);
3612                 }
3613
3614                 put_ldev(mdev);
3615         }
3616
3617         ddsf = be16_to_cpu(p->dds_flags);
3618         if (get_ldev(mdev)) {
3619                 dd = drbd_determine_dev_size(mdev, ddsf);
3620                 put_ldev(mdev);
3621                 if (dd == dev_size_error)
3622                         return -EIO;
3623                 drbd_md_sync(mdev);
3624         } else {
3625                 /* I am diskless, need to accept the peer's size. */
3626                 drbd_set_my_capacity(mdev, p_size);
3627         }
3628
3629         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3630         drbd_reconsider_max_bio_size(mdev);
3631
3632         if (get_ldev(mdev)) {
3633                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3634                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3635                         ldsc = 1;
3636                 }
3637
3638                 put_ldev(mdev);
3639         }
3640
3641         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3642                 if (be64_to_cpu(p->c_size) !=
3643                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3644                         /* we have different sizes, probably peer
3645                          * needs to know my new size... */
3646                         drbd_send_sizes(mdev, 0, ddsf);
3647                 }
3648                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3649                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3650                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3651                             mdev->state.disk >= D_INCONSISTENT) {
3652                                 if (ddsf & DDSF_NO_RESYNC)
3653                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3654                                 else
3655                                         resync_after_online_grow(mdev);
3656                         } else
3657                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3658                 }
3659         }
3660
3661         return 0;
3662 }
3663
3664 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3665 {
3666         struct drbd_conf *mdev;
3667         struct p_uuids *p = pi->data;
3668         u64 *p_uuid;
3669         int i, updated_uuids = 0;
3670
3671         mdev = vnr_to_mdev(tconn, pi->vnr);
3672         if (!mdev)
3673                 return config_unknown_volume(tconn, pi);
3674
3675         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3676         if (!p_uuid) {
3677                 dev_err(DEV, "kmalloc of p_uuid failed\n");
3678                 return false;
3679         }
3680
3681         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3682                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3683
3684         kfree(mdev->p_uuid);
3685         mdev->p_uuid = p_uuid;
3686
3687         if (mdev->state.conn < C_CONNECTED &&
3688             mdev->state.disk < D_INCONSISTENT &&
3689             mdev->state.role == R_PRIMARY &&
3690             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3691                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3692                     (unsigned long long)mdev->ed_uuid);
3693                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3694                 return -EIO;
3695         }
3696
3697         if (get_ldev(mdev)) {
3698                 int skip_initial_sync =
3699                         mdev->state.conn == C_CONNECTED &&
3700                         mdev->tconn->agreed_pro_version >= 90 &&
3701                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3702                         (p_uuid[UI_FLAGS] & 8);
3703                 if (skip_initial_sync) {
3704                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3705                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3706                                         "clear_n_write from receive_uuids",
3707                                         BM_LOCKED_TEST_ALLOWED);
3708                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3709                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3710                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3711                                         CS_VERBOSE, NULL);
3712                         drbd_md_sync(mdev);
3713                         updated_uuids = 1;
3714                 }
3715                 put_ldev(mdev);
3716         } else if (mdev->state.disk < D_INCONSISTENT &&
3717                    mdev->state.role == R_PRIMARY) {
3718                 /* I am a diskless primary, the peer just created a new current UUID
3719                    for me. */
3720                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3721         }
3722
3723         /* Before we test for the disk state, we should wait until an eventually
3724            ongoing cluster wide state change is finished. That is important if
3725            we are primary and are detaching from our disk. We need to see the
3726            new disk state... */
3727         mutex_lock(mdev->state_mutex);
3728         mutex_unlock(mdev->state_mutex);
3729         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3730                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3731
3732         if (updated_uuids)
3733                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3734
3735         return 0;
3736 }
3737
3738 /**
3739  * convert_state() - Converts the peer's view of the cluster state to our point of view
3740  * @ps:         The state as seen by the peer.
3741  */
3742 static union drbd_state convert_state(union drbd_state ps)
3743 {
3744         union drbd_state ms;
3745
3746         static enum drbd_conns c_tab[] = {
3747                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3748                 [C_CONNECTED] = C_CONNECTED,
3749
3750                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3751                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3752                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3753                 [C_VERIFY_S]       = C_VERIFY_T,
3754                 [C_MASK]   = C_MASK,
3755         };
3756
3757         ms.i = ps.i;
3758
3759         ms.conn = c_tab[ps.conn];
3760         ms.peer = ps.role;
3761         ms.role = ps.peer;
3762         ms.pdsk = ps.disk;
3763         ms.disk = ps.pdsk;
3764         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3765
3766         return ms;
3767 }
3768
3769 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3770 {
3771         struct drbd_conf *mdev;
3772         struct p_req_state *p = pi->data;
3773         union drbd_state mask, val;
3774         enum drbd_state_rv rv;
3775
3776         mdev = vnr_to_mdev(tconn, pi->vnr);
3777         if (!mdev)
3778                 return -EIO;
3779
3780         mask.i = be32_to_cpu(p->mask);
3781         val.i = be32_to_cpu(p->val);
3782
3783         if (test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags) &&
3784             mutex_is_locked(mdev->state_mutex)) {
3785                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3786                 return 0;
3787         }
3788
3789         mask = convert_state(mask);
3790         val = convert_state(val);
3791
3792         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3793         drbd_send_sr_reply(mdev, rv);
3794
3795         drbd_md_sync(mdev);
3796
3797         return 0;
3798 }
3799
3800 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3801 {
3802         struct p_req_state *p = pi->data;
3803         union drbd_state mask, val;
3804         enum drbd_state_rv rv;
3805
3806         mask.i = be32_to_cpu(p->mask);
3807         val.i = be32_to_cpu(p->val);
3808
3809         if (test_bit(RESOLVE_CONFLICTS, &tconn->flags) &&
3810             mutex_is_locked(&tconn->cstate_mutex)) {
3811                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3812                 return 0;
3813         }
3814
3815         mask = convert_state(mask);
3816         val = convert_state(val);
3817
3818         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3819         conn_send_sr_reply(tconn, rv);
3820
3821         return 0;
3822 }
3823
3824 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3825 {
3826         struct drbd_conf *mdev;
3827         struct p_state *p = pi->data;
3828         union drbd_state os, ns, peer_state;
3829         enum drbd_disk_state real_peer_disk;
3830         enum chg_state_flags cs_flags;
3831         int rv;
3832
3833         mdev = vnr_to_mdev(tconn, pi->vnr);
3834         if (!mdev)
3835                 return config_unknown_volume(tconn, pi);
3836
3837         peer_state.i = be32_to_cpu(p->state);
3838
3839         real_peer_disk = peer_state.disk;
3840         if (peer_state.disk == D_NEGOTIATING) {
3841                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3842                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3843         }
3844
3845         spin_lock_irq(&mdev->tconn->req_lock);
3846  retry:
3847         os = ns = drbd_read_state(mdev);
3848         spin_unlock_irq(&mdev->tconn->req_lock);
3849
3850         /* If some other part of the code (asender thread, timeout)
3851          * already decided to close the connection again,
3852          * we must not "re-establish" it here. */
3853         if (os.conn <= C_TEAR_DOWN)
3854                 return -ECONNRESET;
3855
3856         /* If this is the "end of sync" confirmation, usually the peer disk
3857          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3858          * set) resync started in PausedSyncT, or if the timing of pause-/
3859          * unpause-sync events has been "just right", the peer disk may
3860          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3861          */
3862         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3863             real_peer_disk == D_UP_TO_DATE &&
3864             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3865                 /* If we are (becoming) SyncSource, but peer is still in sync
3866                  * preparation, ignore its uptodate-ness to avoid flapping, it
3867                  * will change to inconsistent once the peer reaches active
3868                  * syncing states.
3869                  * It may have changed syncer-paused flags, however, so we
3870                  * cannot ignore this completely. */
3871                 if (peer_state.conn > C_CONNECTED &&
3872                     peer_state.conn < C_SYNC_SOURCE)
3873                         real_peer_disk = D_INCONSISTENT;
3874
3875                 /* if peer_state changes to connected at the same time,
3876                  * it explicitly notifies us that it finished resync.
3877                  * Maybe we should finish it up, too? */
3878                 else if (os.conn >= C_SYNC_SOURCE &&
3879                          peer_state.conn == C_CONNECTED) {
3880                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3881                                 drbd_resync_finished(mdev);
3882                         return 0;
3883                 }
3884         }
3885
3886         /* explicit verify finished notification, stop sector reached. */
3887         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3888             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3889                 ov_out_of_sync_print(mdev);
3890                 drbd_resync_finished(mdev);
3891                 return 0;
3892         }
3893
3894         /* peer says his disk is inconsistent, while we think it is uptodate,
3895          * and this happens while the peer still thinks we have a sync going on,
3896          * but we think we are already done with the sync.
3897          * We ignore this to avoid flapping pdsk.
3898          * This should not happen, if the peer is a recent version of drbd. */
3899         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3900             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3901                 real_peer_disk = D_UP_TO_DATE;
3902
3903         if (ns.conn == C_WF_REPORT_PARAMS)
3904                 ns.conn = C_CONNECTED;
3905
3906         if (peer_state.conn == C_AHEAD)
3907                 ns.conn = C_BEHIND;
3908
3909         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3910             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3911                 int cr; /* consider resync */
3912
3913                 /* if we established a new connection */
3914                 cr  = (os.conn < C_CONNECTED);
3915                 /* if we had an established connection
3916                  * and one of the nodes newly attaches a disk */
3917                 cr |= (os.conn == C_CONNECTED &&
3918                        (peer_state.disk == D_NEGOTIATING ||
3919                         os.disk == D_NEGOTIATING));
3920                 /* if we have both been inconsistent, and the peer has been
3921                  * forced to be UpToDate with --overwrite-data */
3922                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3923                 /* if we had been plain connected, and the admin requested to
3924                  * start a sync by "invalidate" or "invalidate-remote" */
3925                 cr |= (os.conn == C_CONNECTED &&
3926                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3927                                  peer_state.conn <= C_WF_BITMAP_T));
3928
3929                 if (cr)
3930                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3931
3932                 put_ldev(mdev);
3933                 if (ns.conn == C_MASK) {
3934                         ns.conn = C_CONNECTED;
3935                         if (mdev->state.disk == D_NEGOTIATING) {
3936                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3937                         } else if (peer_state.disk == D_NEGOTIATING) {
3938                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3939                                 peer_state.disk = D_DISKLESS;
3940                                 real_peer_disk = D_DISKLESS;
3941                         } else {
3942                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3943                                         return -EIO;
3944                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3945                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3946                                 return -EIO;
3947                         }
3948                 }
3949         }
3950
3951         spin_lock_irq(&mdev->tconn->req_lock);
3952         if (os.i != drbd_read_state(mdev).i)
3953                 goto retry;
3954         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3955         ns.peer = peer_state.role;
3956         ns.pdsk = real_peer_disk;
3957         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3958         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3959                 ns.disk = mdev->new_state_tmp.disk;
3960         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3961         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3962             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3963                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3964                    for temporal network outages! */
3965                 spin_unlock_irq(&mdev->tconn->req_lock);
3966                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3967                 tl_clear(mdev->tconn);
3968                 drbd_uuid_new_current(mdev);
3969                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3970                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3971                 return -EIO;
3972         }
3973         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3974         ns = drbd_read_state(mdev);
3975         spin_unlock_irq(&mdev->tconn->req_lock);
3976
3977         if (rv < SS_SUCCESS) {
3978                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3979                 return -EIO;
3980         }
3981
3982         if (os.conn > C_WF_REPORT_PARAMS) {
3983                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3984                     peer_state.disk != D_NEGOTIATING ) {
3985                         /* we want resync, peer has not yet decided to sync... */
3986                         /* Nowadays only used when forcing a node into primary role and
3987                            setting its disk to UpToDate with that */
3988                         drbd_send_uuids(mdev);
3989                         drbd_send_current_state(mdev);
3990                 }
3991         }
3992
3993         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3994
3995         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3996
3997         return 0;
3998 }
3999
4000 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
4001 {
4002         struct drbd_conf *mdev;
4003         struct p_rs_uuid *p = pi->data;
4004
4005         mdev = vnr_to_mdev(tconn, pi->vnr);
4006         if (!mdev)
4007                 return -EIO;
4008
4009         wait_event(mdev->misc_wait,
4010                    mdev->state.conn == C_WF_SYNC_UUID ||
4011                    mdev->state.conn == C_BEHIND ||
4012                    mdev->state.conn < C_CONNECTED ||
4013                    mdev->state.disk < D_NEGOTIATING);
4014
4015         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
4016
4017         /* Here the _drbd_uuid_ functions are right, current should
4018            _not_ be rotated into the history */
4019         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
4020                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
4021                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
4022
4023                 drbd_print_uuids(mdev, "updated sync uuid");
4024                 drbd_start_resync(mdev, C_SYNC_TARGET);
4025
4026                 put_ldev(mdev);
4027         } else
4028                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
4029
4030         return 0;
4031 }
4032
4033 /**
4034  * receive_bitmap_plain
4035  *
4036  * Return 0 when done, 1 when another iteration is needed, and a negative error
4037  * code upon failure.
4038  */
4039 static int
4040 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4041                      unsigned long *p, struct bm_xfer_ctx *c)
4042 {
4043         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4044                                  drbd_header_size(mdev->tconn);
4045         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4046                                        c->bm_words - c->word_offset);
4047         unsigned int want = num_words * sizeof(*p);
4048         int err;
4049
4050         if (want != size) {
4051                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4052                 return -EIO;
4053         }
4054         if (want == 0)
4055                 return 0;
4056         err = drbd_recv_all(mdev->tconn, p, want);
4057         if (err)
4058                 return err;
4059
4060         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4061
4062         c->word_offset += num_words;
4063         c->bit_offset = c->word_offset * BITS_PER_LONG;
4064         if (c->bit_offset > c->bm_bits)
4065                 c->bit_offset = c->bm_bits;
4066
4067         return 1;
4068 }
4069
4070 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4071 {
4072         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4073 }
4074
4075 static int dcbp_get_start(struct p_compressed_bm *p)
4076 {
4077         return (p->encoding & 0x80) != 0;
4078 }
4079
4080 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4081 {
4082         return (p->encoding >> 4) & 0x7;
4083 }
4084
4085 /**
4086  * recv_bm_rle_bits
4087  *
4088  * Return 0 when done, 1 when another iteration is needed, and a negative error
4089  * code upon failure.
4090  */
4091 static int
4092 recv_bm_rle_bits(struct drbd_conf *mdev,
4093                 struct p_compressed_bm *p,
4094                  struct bm_xfer_ctx *c,
4095                  unsigned int len)
4096 {
4097         struct bitstream bs;
4098         u64 look_ahead;
4099         u64 rl;
4100         u64 tmp;
4101         unsigned long s = c->bit_offset;
4102         unsigned long e;
4103         int toggle = dcbp_get_start(p);
4104         int have;
4105         int bits;
4106
4107         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4108
4109         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4110         if (bits < 0)
4111                 return -EIO;
4112
4113         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4114                 bits = vli_decode_bits(&rl, look_ahead);
4115                 if (bits <= 0)
4116                         return -EIO;
4117
4118                 if (toggle) {
4119                         e = s + rl -1;
4120                         if (e >= c->bm_bits) {
4121                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4122                                 return -EIO;
4123                         }
4124                         _drbd_bm_set_bits(mdev, s, e);
4125                 }
4126
4127                 if (have < bits) {
4128                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4129                                 have, bits, look_ahead,
4130                                 (unsigned int)(bs.cur.b - p->code),
4131                                 (unsigned int)bs.buf_len);
4132                         return -EIO;
4133                 }
4134                 look_ahead >>= bits;
4135                 have -= bits;
4136
4137                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4138                 if (bits < 0)
4139                         return -EIO;
4140                 look_ahead |= tmp << have;
4141                 have += bits;
4142         }
4143
4144         c->bit_offset = s;
4145         bm_xfer_ctx_bit_to_word_offset(c);
4146
4147         return (s != c->bm_bits);
4148 }
4149
4150 /**
4151  * decode_bitmap_c
4152  *
4153  * Return 0 when done, 1 when another iteration is needed, and a negative error
4154  * code upon failure.
4155  */
4156 static int
4157 decode_bitmap_c(struct drbd_conf *mdev,
4158                 struct p_compressed_bm *p,
4159                 struct bm_xfer_ctx *c,
4160                 unsigned int len)
4161 {
4162         if (dcbp_get_code(p) == RLE_VLI_Bits)
4163                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4164
4165         /* other variants had been implemented for evaluation,
4166          * but have been dropped as this one turned out to be "best"
4167          * during all our tests. */
4168
4169         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4170         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4171         return -EIO;
4172 }
4173
4174 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4175                 const char *direction, struct bm_xfer_ctx *c)
4176 {
4177         /* what would it take to transfer it "plaintext" */
4178         unsigned int header_size = drbd_header_size(mdev->tconn);
4179         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4180         unsigned int plain =
4181                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4182                 c->bm_words * sizeof(unsigned long);
4183         unsigned int total = c->bytes[0] + c->bytes[1];
4184         unsigned int r;
4185
4186         /* total can not be zero. but just in case: */
4187         if (total == 0)
4188                 return;
4189
4190         /* don't report if not compressed */
4191         if (total >= plain)
4192                 return;
4193
4194         /* total < plain. check for overflow, still */
4195         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4196                                     : (1000 * total / plain);
4197
4198         if (r > 1000)
4199                 r = 1000;
4200
4201         r = 1000 - r;
4202         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4203              "total %u; compression: %u.%u%%\n",
4204                         direction,
4205                         c->bytes[1], c->packets[1],
4206                         c->bytes[0], c->packets[0],
4207                         total, r/10, r % 10);
4208 }
4209
4210 /* Since we are processing the bitfield from lower addresses to higher,
4211    it does not matter if the process it in 32 bit chunks or 64 bit
4212    chunks as long as it is little endian. (Understand it as byte stream,
4213    beginning with the lowest byte...) If we would use big endian
4214    we would need to process it from the highest address to the lowest,
4215    in order to be agnostic to the 32 vs 64 bits issue.
4216
4217    returns 0 on failure, 1 if we successfully received it. */
4218 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4219 {
4220         struct drbd_conf *mdev;
4221         struct bm_xfer_ctx c;
4222         int err;
4223
4224         mdev = vnr_to_mdev(tconn, pi->vnr);
4225         if (!mdev)
4226                 return -EIO;
4227
4228         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4229         /* you are supposed to send additional out-of-sync information
4230          * if you actually set bits during this phase */
4231
4232         c = (struct bm_xfer_ctx) {
4233                 .bm_bits = drbd_bm_bits(mdev),
4234                 .bm_words = drbd_bm_words(mdev),
4235         };
4236
4237         for(;;) {
4238                 if (pi->cmd == P_BITMAP)
4239                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4240                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4241                         /* MAYBE: sanity check that we speak proto >= 90,
4242                          * and the feature is enabled! */
4243                         struct p_compressed_bm *p = pi->data;
4244
4245                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4246                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4247                                 err = -EIO;
4248                                 goto out;
4249                         }
4250                         if (pi->size <= sizeof(*p)) {
4251                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4252                                 err = -EIO;
4253                                 goto out;
4254                         }
4255                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4256                         if (err)
4257                                goto out;
4258                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4259                 } else {
4260                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4261                         err = -EIO;
4262                         goto out;
4263                 }
4264
4265                 c.packets[pi->cmd == P_BITMAP]++;
4266                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4267
4268                 if (err <= 0) {
4269                         if (err < 0)
4270                                 goto out;
4271                         break;
4272                 }
4273                 err = drbd_recv_header(mdev->tconn, pi);
4274                 if (err)
4275                         goto out;
4276         }
4277
4278         INFO_bm_xfer_stats(mdev, "receive", &c);
4279
4280         if (mdev->state.conn == C_WF_BITMAP_T) {
4281                 enum drbd_state_rv rv;
4282
4283                 err = drbd_send_bitmap(mdev);
4284                 if (err)
4285                         goto out;
4286                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4287                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4288                 D_ASSERT(rv == SS_SUCCESS);
4289         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4290                 /* admin may have requested C_DISCONNECTING,
4291                  * other threads may have noticed network errors */
4292                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4293                     drbd_conn_str(mdev->state.conn));
4294         }
4295         err = 0;
4296
4297  out:
4298         drbd_bm_unlock(mdev);
4299         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4300                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4301         return err;
4302 }
4303
4304 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4305 {
4306         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4307                  pi->cmd, pi->size);
4308
4309         return ignore_remaining_packet(tconn, pi);
4310 }
4311
4312 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4313 {
4314         /* Make sure we've acked all the TCP data associated
4315          * with the data requests being unplugged */
4316         drbd_tcp_quickack(tconn->data.socket);
4317
4318         return 0;
4319 }
4320
4321 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4322 {
4323         struct drbd_conf *mdev;
4324         struct p_block_desc *p = pi->data;
4325
4326         mdev = vnr_to_mdev(tconn, pi->vnr);
4327         if (!mdev)
4328                 return -EIO;
4329
4330         switch (mdev->state.conn) {
4331         case C_WF_SYNC_UUID:
4332         case C_WF_BITMAP_T:
4333         case C_BEHIND:
4334                         break;
4335         default:
4336                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4337                                 drbd_conn_str(mdev->state.conn));
4338         }
4339
4340         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4341
4342         return 0;
4343 }
4344
4345 struct data_cmd {
4346         int expect_payload;
4347         size_t pkt_size;
4348         int (*fn)(struct drbd_tconn *, struct packet_info *);
4349 };
4350
4351 static struct data_cmd drbd_cmd_handler[] = {
4352         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4353         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4354         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4355         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4356         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4357         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4358         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4359         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4360         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4361         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4362         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4363         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4364         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4365         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4366         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4367         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4368         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4369         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4370         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4371         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4372         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4373         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4374         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4375         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4376 };
4377
4378 static void drbdd(struct drbd_tconn *tconn)
4379 {
4380         struct packet_info pi;
4381         size_t shs; /* sub header size */
4382         int err;
4383
4384         while (get_t_state(&tconn->receiver) == RUNNING) {
4385                 struct data_cmd *cmd;
4386
4387                 drbd_thread_current_set_cpu(&tconn->receiver);
4388                 if (drbd_recv_header(tconn, &pi))
4389                         goto err_out;
4390
4391                 cmd = &drbd_cmd_handler[pi.cmd];
4392                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4393                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4394                                  cmdname(pi.cmd), pi.cmd);
4395                         goto err_out;
4396                 }
4397
4398                 shs = cmd->pkt_size;
4399                 if (pi.size > shs && !cmd->expect_payload) {
4400                         conn_err(tconn, "No payload expected %s l:%d\n",
4401                                  cmdname(pi.cmd), pi.size);
4402                         goto err_out;
4403                 }
4404
4405                 if (shs) {
4406                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4407                         if (err)
4408                                 goto err_out;
4409                         pi.size -= shs;
4410                 }
4411
4412                 err = cmd->fn(tconn, &pi);
4413                 if (err) {
4414                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4415                                  cmdname(pi.cmd), err, pi.size);
4416                         goto err_out;
4417                 }
4418         }
4419         return;
4420
4421     err_out:
4422         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4423 }
4424
4425 void conn_flush_workqueue(struct drbd_tconn *tconn)
4426 {
4427         struct drbd_wq_barrier barr;
4428
4429         barr.w.cb = w_prev_work_done;
4430         barr.w.tconn = tconn;
4431         init_completion(&barr.done);
4432         drbd_queue_work(&tconn->sender_work, &barr.w);
4433         wait_for_completion(&barr.done);
4434 }
4435
4436 static void conn_disconnect(struct drbd_tconn *tconn)
4437 {
4438         struct drbd_conf *mdev;
4439         enum drbd_conns oc;
4440         int vnr;
4441
4442         if (tconn->cstate == C_STANDALONE)
4443                 return;
4444
4445         /* We are about to start the cleanup after connection loss.
4446          * Make sure drbd_make_request knows about that.
4447          * Usually we should be in some network failure state already,
4448          * but just in case we are not, we fix it up here.
4449          */
4450         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4451
4452         /* asender does not clean up anything. it must not interfere, either */
4453         drbd_thread_stop(&tconn->asender);
4454         drbd_free_sock(tconn);
4455
4456         rcu_read_lock();
4457         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4458                 kref_get(&mdev->kref);
4459                 rcu_read_unlock();
4460                 drbd_disconnected(mdev);
4461                 kref_put(&mdev->kref, &drbd_minor_destroy);
4462                 rcu_read_lock();
4463         }
4464         rcu_read_unlock();
4465
4466         if (!list_empty(&tconn->current_epoch->list))
4467                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4468         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4469         atomic_set(&tconn->current_epoch->epoch_size, 0);
4470         tconn->send.seen_any_write_yet = false;
4471
4472         conn_info(tconn, "Connection closed\n");
4473
4474         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4475                 conn_try_outdate_peer_async(tconn);
4476
4477         spin_lock_irq(&tconn->req_lock);
4478         oc = tconn->cstate;
4479         if (oc >= C_UNCONNECTED)
4480                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4481
4482         spin_unlock_irq(&tconn->req_lock);
4483
4484         if (oc == C_DISCONNECTING)
4485                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4486 }
4487
4488 static int drbd_disconnected(struct drbd_conf *mdev)
4489 {
4490         unsigned int i;
4491
4492         /* wait for current activity to cease. */
4493         spin_lock_irq(&mdev->tconn->req_lock);
4494         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4495         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4496         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4497         spin_unlock_irq(&mdev->tconn->req_lock);
4498
4499         /* We do not have data structures that would allow us to
4500          * get the rs_pending_cnt down to 0 again.
4501          *  * On C_SYNC_TARGET we do not have any data structures describing
4502          *    the pending RSDataRequest's we have sent.
4503          *  * On C_SYNC_SOURCE there is no data structure that tracks
4504          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4505          *  And no, it is not the sum of the reference counts in the
4506          *  resync_LRU. The resync_LRU tracks the whole operation including
4507          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4508          *  on the fly. */
4509         drbd_rs_cancel_all(mdev);
4510         mdev->rs_total = 0;
4511         mdev->rs_failed = 0;
4512         atomic_set(&mdev->rs_pending_cnt, 0);
4513         wake_up(&mdev->misc_wait);
4514
4515         del_timer_sync(&mdev->resync_timer);
4516         resync_timer_fn((unsigned long)mdev);
4517
4518         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4519          * w_make_resync_request etc. which may still be on the worker queue
4520          * to be "canceled" */
4521         drbd_flush_workqueue(mdev);
4522
4523         drbd_finish_peer_reqs(mdev);
4524
4525         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4526            might have issued a work again. The one before drbd_finish_peer_reqs() is
4527            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4528         drbd_flush_workqueue(mdev);
4529
4530         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4531          * again via drbd_try_clear_on_disk_bm(). */
4532         drbd_rs_cancel_all(mdev);
4533
4534         kfree(mdev->p_uuid);
4535         mdev->p_uuid = NULL;
4536
4537         if (!drbd_suspended(mdev))
4538                 tl_clear(mdev->tconn);
4539
4540         drbd_md_sync(mdev);
4541
4542         /* serialize with bitmap writeout triggered by the state change,
4543          * if any. */
4544         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4545
4546         /* tcp_close and release of sendpage pages can be deferred.  I don't
4547          * want to use SO_LINGER, because apparently it can be deferred for
4548          * more than 20 seconds (longest time I checked).
4549          *
4550          * Actually we don't care for exactly when the network stack does its
4551          * put_page(), but release our reference on these pages right here.
4552          */
4553         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4554         if (i)
4555                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4556         i = atomic_read(&mdev->pp_in_use_by_net);
4557         if (i)
4558                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4559         i = atomic_read(&mdev->pp_in_use);
4560         if (i)
4561                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4562
4563         D_ASSERT(list_empty(&mdev->read_ee));
4564         D_ASSERT(list_empty(&mdev->active_ee));
4565         D_ASSERT(list_empty(&mdev->sync_ee));
4566         D_ASSERT(list_empty(&mdev->done_ee));
4567
4568         return 0;
4569 }
4570
4571 /*
4572  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4573  * we can agree on is stored in agreed_pro_version.
4574  *
4575  * feature flags and the reserved array should be enough room for future
4576  * enhancements of the handshake protocol, and possible plugins...
4577  *
4578  * for now, they are expected to be zero, but ignored.
4579  */
4580 static int drbd_send_features(struct drbd_tconn *tconn)
4581 {
4582         struct drbd_socket *sock;
4583         struct p_connection_features *p;
4584
4585         sock = &tconn->data;
4586         p = conn_prepare_command(tconn, sock);
4587         if (!p)
4588                 return -EIO;
4589         memset(p, 0, sizeof(*p));
4590         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4591         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4592         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4593 }
4594
4595 /*
4596  * return values:
4597  *   1 yes, we have a valid connection
4598  *   0 oops, did not work out, please try again
4599  *  -1 peer talks different language,
4600  *     no point in trying again, please go standalone.
4601  */
4602 static int drbd_do_features(struct drbd_tconn *tconn)
4603 {
4604         /* ASSERT current == tconn->receiver ... */
4605         struct p_connection_features *p;
4606         const int expect = sizeof(struct p_connection_features);
4607         struct packet_info pi;
4608         int err;
4609
4610         err = drbd_send_features(tconn);
4611         if (err)
4612                 return 0;
4613
4614         err = drbd_recv_header(tconn, &pi);
4615         if (err)
4616                 return 0;
4617
4618         if (pi.cmd != P_CONNECTION_FEATURES) {
4619                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4620                          cmdname(pi.cmd), pi.cmd);
4621                 return -1;
4622         }
4623
4624         if (pi.size != expect) {
4625                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4626                      expect, pi.size);
4627                 return -1;
4628         }
4629
4630         p = pi.data;
4631         err = drbd_recv_all_warn(tconn, p, expect);
4632         if (err)
4633                 return 0;
4634
4635         p->protocol_min = be32_to_cpu(p->protocol_min);
4636         p->protocol_max = be32_to_cpu(p->protocol_max);
4637         if (p->protocol_max == 0)
4638                 p->protocol_max = p->protocol_min;
4639
4640         if (PRO_VERSION_MAX < p->protocol_min ||
4641             PRO_VERSION_MIN > p->protocol_max)
4642                 goto incompat;
4643
4644         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4645
4646         conn_info(tconn, "Handshake successful: "
4647              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4648
4649         return 1;
4650
4651  incompat:
4652         conn_err(tconn, "incompatible DRBD dialects: "
4653             "I support %d-%d, peer supports %d-%d\n",
4654             PRO_VERSION_MIN, PRO_VERSION_MAX,
4655             p->protocol_min, p->protocol_max);
4656         return -1;
4657 }
4658
4659 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4660 static int drbd_do_auth(struct drbd_tconn *tconn)
4661 {
4662         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4663         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4664         return -1;
4665 }
4666 #else
4667 #define CHALLENGE_LEN 64
4668
4669 /* Return value:
4670         1 - auth succeeded,
4671         0 - failed, try again (network error),
4672         -1 - auth failed, don't try again.
4673 */
4674
4675 static int drbd_do_auth(struct drbd_tconn *tconn)
4676 {
4677         struct drbd_socket *sock;
4678         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4679         struct scatterlist sg;
4680         char *response = NULL;
4681         char *right_response = NULL;
4682         char *peers_ch = NULL;
4683         unsigned int key_len;
4684         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4685         unsigned int resp_size;
4686         struct hash_desc desc;
4687         struct packet_info pi;
4688         struct net_conf *nc;
4689         int err, rv;
4690
4691         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4692
4693         rcu_read_lock();
4694         nc = rcu_dereference(tconn->net_conf);
4695         key_len = strlen(nc->shared_secret);
4696         memcpy(secret, nc->shared_secret, key_len);
4697         rcu_read_unlock();
4698
4699         desc.tfm = tconn->cram_hmac_tfm;
4700         desc.flags = 0;
4701
4702         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4703         if (rv) {
4704                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4705                 rv = -1;
4706                 goto fail;
4707         }
4708
4709         get_random_bytes(my_challenge, CHALLENGE_LEN);
4710
4711         sock = &tconn->data;
4712         if (!conn_prepare_command(tconn, sock)) {
4713                 rv = 0;
4714                 goto fail;
4715         }
4716         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4717                                 my_challenge, CHALLENGE_LEN);
4718         if (!rv)
4719                 goto fail;
4720
4721         err = drbd_recv_header(tconn, &pi);
4722         if (err) {
4723                 rv = 0;
4724                 goto fail;
4725         }
4726
4727         if (pi.cmd != P_AUTH_CHALLENGE) {
4728                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4729                          cmdname(pi.cmd), pi.cmd);
4730                 rv = 0;
4731                 goto fail;
4732         }
4733
4734         if (pi.size > CHALLENGE_LEN * 2) {
4735                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4736                 rv = -1;
4737                 goto fail;
4738         }
4739
4740         peers_ch = kmalloc(pi.size, GFP_NOIO);
4741         if (peers_ch == NULL) {
4742                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4743                 rv = -1;
4744                 goto fail;
4745         }
4746
4747         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4748         if (err) {
4749                 rv = 0;
4750                 goto fail;
4751         }
4752
4753         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4754         response = kmalloc(resp_size, GFP_NOIO);
4755         if (response == NULL) {
4756                 conn_err(tconn, "kmalloc of response failed\n");
4757                 rv = -1;
4758                 goto fail;
4759         }
4760
4761         sg_init_table(&sg, 1);
4762         sg_set_buf(&sg, peers_ch, pi.size);
4763
4764         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4765         if (rv) {
4766                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4767                 rv = -1;
4768                 goto fail;
4769         }
4770
4771         if (!conn_prepare_command(tconn, sock)) {
4772                 rv = 0;
4773                 goto fail;
4774         }
4775         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4776                                 response, resp_size);
4777         if (!rv)
4778                 goto fail;
4779
4780         err = drbd_recv_header(tconn, &pi);
4781         if (err) {
4782                 rv = 0;
4783                 goto fail;
4784         }
4785
4786         if (pi.cmd != P_AUTH_RESPONSE) {
4787                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4788                          cmdname(pi.cmd), pi.cmd);
4789                 rv = 0;
4790                 goto fail;
4791         }
4792
4793         if (pi.size != resp_size) {
4794                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4795                 rv = 0;
4796                 goto fail;
4797         }
4798
4799         err = drbd_recv_all_warn(tconn, response , resp_size);
4800         if (err) {
4801                 rv = 0;
4802                 goto fail;
4803         }
4804
4805         right_response = kmalloc(resp_size, GFP_NOIO);
4806         if (right_response == NULL) {
4807                 conn_err(tconn, "kmalloc of right_response failed\n");
4808                 rv = -1;
4809                 goto fail;
4810         }
4811
4812         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4813
4814         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4815         if (rv) {
4816                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4817                 rv = -1;
4818                 goto fail;
4819         }
4820
4821         rv = !memcmp(response, right_response, resp_size);
4822
4823         if (rv)
4824                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4825                      resp_size);
4826         else
4827                 rv = -1;
4828
4829  fail:
4830         kfree(peers_ch);
4831         kfree(response);
4832         kfree(right_response);
4833
4834         return rv;
4835 }
4836 #endif
4837
4838 int drbdd_init(struct drbd_thread *thi)
4839 {
4840         struct drbd_tconn *tconn = thi->tconn;
4841         int h;
4842
4843         conn_info(tconn, "receiver (re)started\n");
4844
4845         do {
4846                 h = conn_connect(tconn);
4847                 if (h == 0) {
4848                         conn_disconnect(tconn);
4849                         schedule_timeout_interruptible(HZ);
4850                 }
4851                 if (h == -1) {
4852                         conn_warn(tconn, "Discarding network configuration.\n");
4853                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4854                 }
4855         } while (h == 0);
4856
4857         if (h > 0)
4858                 drbdd(tconn);
4859
4860         conn_disconnect(tconn);
4861
4862         conn_info(tconn, "receiver terminated\n");
4863         return 0;
4864 }
4865
4866 /* ********* acknowledge sender ******** */
4867
4868 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4869 {
4870         struct p_req_state_reply *p = pi->data;
4871         int retcode = be32_to_cpu(p->retcode);
4872
4873         if (retcode >= SS_SUCCESS) {
4874                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4875         } else {
4876                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4877                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4878                          drbd_set_st_err_str(retcode), retcode);
4879         }
4880         wake_up(&tconn->ping_wait);
4881
4882         return 0;
4883 }
4884
4885 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4886 {
4887         struct drbd_conf *mdev;
4888         struct p_req_state_reply *p = pi->data;
4889         int retcode = be32_to_cpu(p->retcode);
4890
4891         mdev = vnr_to_mdev(tconn, pi->vnr);
4892         if (!mdev)
4893                 return -EIO;
4894
4895         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4896                 D_ASSERT(tconn->agreed_pro_version < 100);
4897                 return got_conn_RqSReply(tconn, pi);
4898         }
4899
4900         if (retcode >= SS_SUCCESS) {
4901                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4902         } else {
4903                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4904                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4905                         drbd_set_st_err_str(retcode), retcode);
4906         }
4907         wake_up(&mdev->state_wait);
4908
4909         return 0;
4910 }
4911
4912 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4913 {
4914         return drbd_send_ping_ack(tconn);
4915
4916 }
4917
4918 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4919 {
4920         /* restore idle timeout */
4921         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4922         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4923                 wake_up(&tconn->ping_wait);
4924
4925         return 0;
4926 }
4927
4928 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4929 {
4930         struct drbd_conf *mdev;
4931         struct p_block_ack *p = pi->data;
4932         sector_t sector = be64_to_cpu(p->sector);
4933         int blksize = be32_to_cpu(p->blksize);
4934
4935         mdev = vnr_to_mdev(tconn, pi->vnr);
4936         if (!mdev)
4937                 return -EIO;
4938
4939         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4940
4941         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4942
4943         if (get_ldev(mdev)) {
4944                 drbd_rs_complete_io(mdev, sector);
4945                 drbd_set_in_sync(mdev, sector, blksize);
4946                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4947                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4948                 put_ldev(mdev);
4949         }
4950         dec_rs_pending(mdev);
4951         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4952
4953         return 0;
4954 }
4955
4956 static int
4957 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4958                               struct rb_root *root, const char *func,
4959                               enum drbd_req_event what, bool missing_ok)
4960 {
4961         struct drbd_request *req;
4962         struct bio_and_error m;
4963
4964         spin_lock_irq(&mdev->tconn->req_lock);
4965         req = find_request(mdev, root, id, sector, missing_ok, func);
4966         if (unlikely(!req)) {
4967                 spin_unlock_irq(&mdev->tconn->req_lock);
4968                 return -EIO;
4969         }
4970         __req_mod(req, what, &m);
4971         spin_unlock_irq(&mdev->tconn->req_lock);
4972
4973         if (m.bio)
4974                 complete_master_bio(mdev, &m);
4975         return 0;
4976 }
4977
4978 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4979 {
4980         struct drbd_conf *mdev;
4981         struct p_block_ack *p = pi->data;
4982         sector_t sector = be64_to_cpu(p->sector);
4983         int blksize = be32_to_cpu(p->blksize);
4984         enum drbd_req_event what;
4985
4986         mdev = vnr_to_mdev(tconn, pi->vnr);
4987         if (!mdev)
4988                 return -EIO;
4989
4990         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4991
4992         if (p->block_id == ID_SYNCER) {
4993                 drbd_set_in_sync(mdev, sector, blksize);
4994                 dec_rs_pending(mdev);
4995                 return 0;
4996         }
4997         switch (pi->cmd) {
4998         case P_RS_WRITE_ACK:
4999                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5000                 break;
5001         case P_WRITE_ACK:
5002                 what = WRITE_ACKED_BY_PEER;
5003                 break;
5004         case P_RECV_ACK:
5005                 what = RECV_ACKED_BY_PEER;
5006                 break;
5007         case P_SUPERSEDED:
5008                 what = CONFLICT_RESOLVED;
5009                 break;
5010         case P_RETRY_WRITE:
5011                 what = POSTPONE_WRITE;
5012                 break;
5013         default:
5014                 BUG();
5015         }
5016
5017         return validate_req_change_req_state(mdev, p->block_id, sector,
5018                                              &mdev->write_requests, __func__,
5019                                              what, false);
5020 }
5021
5022 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
5023 {
5024         struct drbd_conf *mdev;
5025         struct p_block_ack *p = pi->data;
5026         sector_t sector = be64_to_cpu(p->sector);
5027         int size = be32_to_cpu(p->blksize);
5028         int err;
5029
5030         mdev = vnr_to_mdev(tconn, pi->vnr);
5031         if (!mdev)
5032                 return -EIO;
5033
5034         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5035
5036         if (p->block_id == ID_SYNCER) {
5037                 dec_rs_pending(mdev);
5038                 drbd_rs_failed_io(mdev, sector, size);
5039                 return 0;
5040         }
5041
5042         err = validate_req_change_req_state(mdev, p->block_id, sector,
5043                                             &mdev->write_requests, __func__,
5044                                             NEG_ACKED, true);
5045         if (err) {
5046                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5047                    The master bio might already be completed, therefore the
5048                    request is no longer in the collision hash. */
5049                 /* In Protocol B we might already have got a P_RECV_ACK
5050                    but then get a P_NEG_ACK afterwards. */
5051                 drbd_set_out_of_sync(mdev, sector, size);
5052         }
5053         return 0;
5054 }
5055
5056 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5057 {
5058         struct drbd_conf *mdev;
5059         struct p_block_ack *p = pi->data;
5060         sector_t sector = be64_to_cpu(p->sector);
5061
5062         mdev = vnr_to_mdev(tconn, pi->vnr);
5063         if (!mdev)
5064                 return -EIO;
5065
5066         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5067
5068         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5069             (unsigned long long)sector, be32_to_cpu(p->blksize));
5070
5071         return validate_req_change_req_state(mdev, p->block_id, sector,
5072                                              &mdev->read_requests, __func__,
5073                                              NEG_ACKED, false);
5074 }
5075
5076 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5077 {
5078         struct drbd_conf *mdev;
5079         sector_t sector;
5080         int size;
5081         struct p_block_ack *p = pi->data;
5082
5083         mdev = vnr_to_mdev(tconn, pi->vnr);
5084         if (!mdev)
5085                 return -EIO;
5086
5087         sector = be64_to_cpu(p->sector);
5088         size = be32_to_cpu(p->blksize);
5089
5090         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5091
5092         dec_rs_pending(mdev);
5093
5094         if (get_ldev_if_state(mdev, D_FAILED)) {
5095                 drbd_rs_complete_io(mdev, sector);
5096                 switch (pi->cmd) {
5097                 case P_NEG_RS_DREPLY:
5098                         drbd_rs_failed_io(mdev, sector, size);
5099                 case P_RS_CANCEL:
5100                         break;
5101                 default:
5102                         BUG();
5103                 }
5104                 put_ldev(mdev);
5105         }
5106
5107         return 0;
5108 }
5109
5110 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5111 {
5112         struct p_barrier_ack *p = pi->data;
5113         struct drbd_conf *mdev;
5114         int vnr;
5115
5116         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5117
5118         rcu_read_lock();
5119         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5120                 if (mdev->state.conn == C_AHEAD &&
5121                     atomic_read(&mdev->ap_in_flight) == 0 &&
5122                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5123                         mdev->start_resync_timer.expires = jiffies + HZ;
5124                         add_timer(&mdev->start_resync_timer);
5125                 }
5126         }
5127         rcu_read_unlock();
5128
5129         return 0;
5130 }
5131
5132 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5133 {
5134         struct drbd_conf *mdev;
5135         struct p_block_ack *p = pi->data;
5136         struct drbd_work *w;
5137         sector_t sector;
5138         int size;
5139
5140         mdev = vnr_to_mdev(tconn, pi->vnr);
5141         if (!mdev)
5142                 return -EIO;
5143
5144         sector = be64_to_cpu(p->sector);
5145         size = be32_to_cpu(p->blksize);
5146
5147         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5148
5149         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5150                 drbd_ov_out_of_sync_found(mdev, sector, size);
5151         else
5152                 ov_out_of_sync_print(mdev);
5153
5154         if (!get_ldev(mdev))
5155                 return 0;
5156
5157         drbd_rs_complete_io(mdev, sector);
5158         dec_rs_pending(mdev);
5159
5160         --mdev->ov_left;
5161
5162         /* let's advance progress step marks only for every other megabyte */
5163         if ((mdev->ov_left & 0x200) == 0x200)
5164                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5165
5166         if (mdev->ov_left == 0) {
5167                 w = kmalloc(sizeof(*w), GFP_NOIO);
5168                 if (w) {
5169                         w->cb = w_ov_finished;
5170                         w->mdev = mdev;
5171                         drbd_queue_work(&mdev->tconn->sender_work, w);
5172                 } else {
5173                         dev_err(DEV, "kmalloc(w) failed.");
5174                         ov_out_of_sync_print(mdev);
5175                         drbd_resync_finished(mdev);
5176                 }
5177         }
5178         put_ldev(mdev);
5179         return 0;
5180 }
5181
5182 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5183 {
5184         return 0;
5185 }
5186
5187 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5188 {
5189         struct drbd_conf *mdev;
5190         int vnr, not_empty = 0;
5191
5192         do {
5193                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5194                 flush_signals(current);
5195
5196                 rcu_read_lock();
5197                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5198                         kref_get(&mdev->kref);
5199                         rcu_read_unlock();
5200                         if (drbd_finish_peer_reqs(mdev)) {
5201                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5202                                 return 1;
5203                         }
5204                         kref_put(&mdev->kref, &drbd_minor_destroy);
5205                         rcu_read_lock();
5206                 }
5207                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5208
5209                 spin_lock_irq(&tconn->req_lock);
5210                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5211                         not_empty = !list_empty(&mdev->done_ee);
5212                         if (not_empty)
5213                                 break;
5214                 }
5215                 spin_unlock_irq(&tconn->req_lock);
5216                 rcu_read_unlock();
5217         } while (not_empty);
5218
5219         return 0;
5220 }
5221
5222 struct asender_cmd {
5223         size_t pkt_size;
5224         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5225 };
5226
5227 static struct asender_cmd asender_tbl[] = {
5228         [P_PING]            = { 0, got_Ping },
5229         [P_PING_ACK]        = { 0, got_PingAck },
5230         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5231         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5232         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5233         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5234         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5235         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5236         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5237         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5238         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5239         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5240         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5241         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5242         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5243         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5244         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5245 };
5246
5247 int drbd_asender(struct drbd_thread *thi)
5248 {
5249         struct drbd_tconn *tconn = thi->tconn;
5250         struct asender_cmd *cmd = NULL;
5251         struct packet_info pi;
5252         int rv;
5253         void *buf    = tconn->meta.rbuf;
5254         int received = 0;
5255         unsigned int header_size = drbd_header_size(tconn);
5256         int expect   = header_size;
5257         bool ping_timeout_active = false;
5258         struct net_conf *nc;
5259         int ping_timeo, tcp_cork, ping_int;
5260
5261         current->policy = SCHED_RR;  /* Make this a realtime task! */
5262         current->rt_priority = 2;    /* more important than all other tasks */
5263
5264         while (get_t_state(thi) == RUNNING) {
5265                 drbd_thread_current_set_cpu(thi);
5266
5267                 rcu_read_lock();
5268                 nc = rcu_dereference(tconn->net_conf);
5269                 ping_timeo = nc->ping_timeo;
5270                 tcp_cork = nc->tcp_cork;
5271                 ping_int = nc->ping_int;
5272                 rcu_read_unlock();
5273
5274                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5275                         if (drbd_send_ping(tconn)) {
5276                                 conn_err(tconn, "drbd_send_ping has failed\n");
5277                                 goto reconnect;
5278                         }
5279                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5280                         ping_timeout_active = true;
5281                 }
5282
5283                 /* TODO: conditionally cork; it may hurt latency if we cork without
5284                    much to send */
5285                 if (tcp_cork)
5286                         drbd_tcp_cork(tconn->meta.socket);
5287                 if (tconn_finish_peer_reqs(tconn)) {
5288                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5289                         goto reconnect;
5290                 }
5291                 /* but unconditionally uncork unless disabled */
5292                 if (tcp_cork)
5293                         drbd_tcp_uncork(tconn->meta.socket);
5294
5295                 /* short circuit, recv_msg would return EINTR anyways. */
5296                 if (signal_pending(current))
5297                         continue;
5298
5299                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5300                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5301
5302                 flush_signals(current);
5303
5304                 /* Note:
5305                  * -EINTR        (on meta) we got a signal
5306                  * -EAGAIN       (on meta) rcvtimeo expired
5307                  * -ECONNRESET   other side closed the connection
5308                  * -ERESTARTSYS  (on data) we got a signal
5309                  * rv <  0       other than above: unexpected error!
5310                  * rv == expected: full header or command
5311                  * rv <  expected: "woken" by signal during receive
5312                  * rv == 0       : "connection shut down by peer"
5313                  */
5314                 if (likely(rv > 0)) {
5315                         received += rv;
5316                         buf      += rv;
5317                 } else if (rv == 0) {
5318                         if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
5319                                 long t;
5320                                 rcu_read_lock();
5321                                 t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
5322                                 rcu_read_unlock();
5323
5324                                 t = wait_event_timeout(tconn->ping_wait,
5325                                                        tconn->cstate < C_WF_REPORT_PARAMS,
5326                                                        t);
5327                                 if (t)
5328                                         break;
5329                         }
5330                         conn_err(tconn, "meta connection shut down by peer.\n");
5331                         goto reconnect;
5332                 } else if (rv == -EAGAIN) {
5333                         /* If the data socket received something meanwhile,
5334                          * that is good enough: peer is still alive. */
5335                         if (time_after(tconn->last_received,
5336                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5337                                 continue;
5338                         if (ping_timeout_active) {
5339                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5340                                 goto reconnect;
5341                         }
5342                         set_bit(SEND_PING, &tconn->flags);
5343                         continue;
5344                 } else if (rv == -EINTR) {
5345                         continue;
5346                 } else {
5347                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5348                         goto reconnect;
5349                 }
5350
5351                 if (received == expect && cmd == NULL) {
5352                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5353                                 goto reconnect;
5354                         cmd = &asender_tbl[pi.cmd];
5355                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5356                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5357                                          cmdname(pi.cmd), pi.cmd);
5358                                 goto disconnect;
5359                         }
5360                         expect = header_size + cmd->pkt_size;
5361                         if (pi.size != expect - header_size) {
5362                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5363                                         pi.cmd, pi.size);
5364                                 goto reconnect;
5365                         }
5366                 }
5367                 if (received == expect) {
5368                         bool err;
5369
5370                         err = cmd->fn(tconn, &pi);
5371                         if (err) {
5372                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5373                                 goto reconnect;
5374                         }
5375
5376                         tconn->last_received = jiffies;
5377
5378                         if (cmd == &asender_tbl[P_PING_ACK]) {
5379                                 /* restore idle timeout */
5380                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5381                                 ping_timeout_active = false;
5382                         }
5383
5384                         buf      = tconn->meta.rbuf;
5385                         received = 0;
5386                         expect   = header_size;
5387                         cmd      = NULL;
5388                 }
5389         }
5390
5391         if (0) {
5392 reconnect:
5393                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5394                 conn_md_sync(tconn);
5395         }
5396         if (0) {
5397 disconnect:
5398                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5399         }
5400         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5401
5402         conn_info(tconn, "asender terminated\n");
5403
5404         return 0;
5405 }