drbd: Avoid NetworkFailure state during disconnect
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_peer_request *peer_req;
337         struct page *page = NULL;
338         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
339
340         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
341                 return NULL;
342
343         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
344         if (!peer_req) {
345                 if (!(gfp_mask & __GFP_NOWARN))
346                         dev_err(DEV, "%s: allocation failed\n", __func__);
347                 return NULL;
348         }
349
350         if (data_size) {
351                 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
352                 if (!page)
353                         goto fail;
354         }
355
356         drbd_clear_interval(&peer_req->i);
357         peer_req->i.size = data_size;
358         peer_req->i.sector = sector;
359         peer_req->i.local = false;
360         peer_req->i.waiting = false;
361
362         peer_req->epoch = NULL;
363         peer_req->w.mdev = mdev;
364         peer_req->pages = page;
365         atomic_set(&peer_req->pending_bios, 0);
366         peer_req->flags = 0;
367         /*
368          * The block_id is opaque to the receiver.  It is not endianness
369          * converted, and sent back to the sender unchanged.
370          */
371         peer_req->block_id = id;
372
373         return peer_req;
374
375  fail:
376         mempool_free(peer_req, drbd_ee_mempool);
377         return NULL;
378 }
379
380 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
381                        int is_net)
382 {
383         if (peer_req->flags & EE_HAS_DIGEST)
384                 kfree(peer_req->digest);
385         drbd_free_pages(mdev, peer_req->pages, is_net);
386         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
387         D_ASSERT(drbd_interval_empty(&peer_req->i));
388         mempool_free(peer_req, drbd_ee_mempool);
389 }
390
391 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
392 {
393         LIST_HEAD(work_list);
394         struct drbd_peer_request *peer_req, *t;
395         int count = 0;
396         int is_net = list == &mdev->net_ee;
397
398         spin_lock_irq(&mdev->tconn->req_lock);
399         list_splice_init(list, &work_list);
400         spin_unlock_irq(&mdev->tconn->req_lock);
401
402         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
403                 __drbd_free_peer_req(mdev, peer_req, is_net);
404                 count++;
405         }
406         return count;
407 }
408
409 /*
410  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
411  */
412 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
413 {
414         LIST_HEAD(work_list);
415         LIST_HEAD(reclaimed);
416         struct drbd_peer_request *peer_req, *t;
417         int err = 0;
418
419         spin_lock_irq(&mdev->tconn->req_lock);
420         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
421         list_splice_init(&mdev->done_ee, &work_list);
422         spin_unlock_irq(&mdev->tconn->req_lock);
423
424         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
425                 drbd_free_net_peer_req(mdev, peer_req);
426
427         /* possible callbacks here:
428          * e_end_block, and e_end_resync_block, e_send_superseded.
429          * all ignore the last argument.
430          */
431         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432                 int err2;
433
434                 /* list_del not necessary, next/prev members not touched */
435                 err2 = peer_req->w.cb(&peer_req->w, !!err);
436                 if (!err)
437                         err = err2;
438                 drbd_free_peer_req(mdev, peer_req);
439         }
440         wake_up(&mdev->ee_wait);
441
442         return err;
443 }
444
445 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
446                                      struct list_head *head)
447 {
448         DEFINE_WAIT(wait);
449
450         /* avoids spin_lock/unlock
451          * and calling prepare_to_wait in the fast path */
452         while (!list_empty(head)) {
453                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
454                 spin_unlock_irq(&mdev->tconn->req_lock);
455                 io_schedule();
456                 finish_wait(&mdev->ee_wait, &wait);
457                 spin_lock_irq(&mdev->tconn->req_lock);
458         }
459 }
460
461 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
462                                     struct list_head *head)
463 {
464         spin_lock_irq(&mdev->tconn->req_lock);
465         _drbd_wait_ee_list_empty(mdev, head);
466         spin_unlock_irq(&mdev->tconn->req_lock);
467 }
468
469 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
470 {
471         mm_segment_t oldfs;
472         struct kvec iov = {
473                 .iov_base = buf,
474                 .iov_len = size,
475         };
476         struct msghdr msg = {
477                 .msg_iovlen = 1,
478                 .msg_iov = (struct iovec *)&iov,
479                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
480         };
481         int rv;
482
483         oldfs = get_fs();
484         set_fs(KERNEL_DS);
485         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
486         set_fs(oldfs);
487
488         return rv;
489 }
490
491 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
492 {
493         mm_segment_t oldfs;
494         struct kvec iov = {
495                 .iov_base = buf,
496                 .iov_len = size,
497         };
498         struct msghdr msg = {
499                 .msg_iovlen = 1,
500                 .msg_iov = (struct iovec *)&iov,
501                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
502         };
503         int rv;
504
505         oldfs = get_fs();
506         set_fs(KERNEL_DS);
507
508         for (;;) {
509                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
510                 if (rv == size)
511                         break;
512
513                 /* Note:
514                  * ECONNRESET   other side closed the connection
515                  * ERESTARTSYS  (on  sock) we got a signal
516                  */
517
518                 if (rv < 0) {
519                         if (rv == -ECONNRESET)
520                                 conn_info(tconn, "sock was reset by peer\n");
521                         else if (rv != -ERESTARTSYS)
522                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
523                         break;
524                 } else if (rv == 0) {
525                         break;
526                 } else  {
527                         /* signal came in, or peer/link went down,
528                          * after we read a partial message
529                          */
530                         /* D_ASSERT(signal_pending(current)); */
531                         break;
532                 }
533         };
534
535         set_fs(oldfs);
536
537         if (rv == 0) {
538                 if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
539                         long t;
540                         rcu_read_lock();
541                         t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
542                         rcu_read_unlock();
543
544                         t = wait_event_timeout(tconn->ping_wait, tconn->cstate < C_WF_REPORT_PARAMS, t);
545
546                         if (t)
547                                 goto out;
548                 }
549                 conn_info(tconn, "sock was shut down by peer\n");
550         }
551
552         if (rv != size)
553                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
554
555 out:
556         return rv;
557 }
558
559 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
560 {
561         int err;
562
563         err = drbd_recv(tconn, buf, size);
564         if (err != size) {
565                 if (err >= 0)
566                         err = -EIO;
567         } else
568                 err = 0;
569         return err;
570 }
571
572 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
573 {
574         int err;
575
576         err = drbd_recv_all(tconn, buf, size);
577         if (err && !signal_pending(current))
578                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
579         return err;
580 }
581
582 /* quoting tcp(7):
583  *   On individual connections, the socket buffer size must be set prior to the
584  *   listen(2) or connect(2) calls in order to have it take effect.
585  * This is our wrapper to do so.
586  */
587 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
588                 unsigned int rcv)
589 {
590         /* open coded SO_SNDBUF, SO_RCVBUF */
591         if (snd) {
592                 sock->sk->sk_sndbuf = snd;
593                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
594         }
595         if (rcv) {
596                 sock->sk->sk_rcvbuf = rcv;
597                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
598         }
599 }
600
601 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
602 {
603         const char *what;
604         struct socket *sock;
605         struct sockaddr_in6 src_in6;
606         struct sockaddr_in6 peer_in6;
607         struct net_conf *nc;
608         int err, peer_addr_len, my_addr_len;
609         int sndbuf_size, rcvbuf_size, connect_int;
610         int disconnect_on_error = 1;
611
612         rcu_read_lock();
613         nc = rcu_dereference(tconn->net_conf);
614         if (!nc) {
615                 rcu_read_unlock();
616                 return NULL;
617         }
618         sndbuf_size = nc->sndbuf_size;
619         rcvbuf_size = nc->rcvbuf_size;
620         connect_int = nc->connect_int;
621         rcu_read_unlock();
622
623         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
624         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
625
626         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
627                 src_in6.sin6_port = 0;
628         else
629                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
630
631         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
632         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
633
634         what = "sock_create_kern";
635         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
636                                SOCK_STREAM, IPPROTO_TCP, &sock);
637         if (err < 0) {
638                 sock = NULL;
639                 goto out;
640         }
641
642         sock->sk->sk_rcvtimeo =
643         sock->sk->sk_sndtimeo = connect_int * HZ;
644         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
645
646        /* explicitly bind to the configured IP as source IP
647         *  for the outgoing connections.
648         *  This is needed for multihomed hosts and to be
649         *  able to use lo: interfaces for drbd.
650         * Make sure to use 0 as port number, so linux selects
651         *  a free one dynamically.
652         */
653         what = "bind before connect";
654         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
655         if (err < 0)
656                 goto out;
657
658         /* connect may fail, peer not yet available.
659          * stay C_WF_CONNECTION, don't go Disconnecting! */
660         disconnect_on_error = 0;
661         what = "connect";
662         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
663
664 out:
665         if (err < 0) {
666                 if (sock) {
667                         sock_release(sock);
668                         sock = NULL;
669                 }
670                 switch (-err) {
671                         /* timeout, busy, signal pending */
672                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
673                 case EINTR: case ERESTARTSYS:
674                         /* peer not (yet) available, network problem */
675                 case ECONNREFUSED: case ENETUNREACH:
676                 case EHOSTDOWN:    case EHOSTUNREACH:
677                         disconnect_on_error = 0;
678                         break;
679                 default:
680                         conn_err(tconn, "%s failed, err = %d\n", what, err);
681                 }
682                 if (disconnect_on_error)
683                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
684         }
685
686         return sock;
687 }
688
689 struct accept_wait_data {
690         struct drbd_tconn *tconn;
691         struct socket *s_listen;
692         struct completion door_bell;
693         void (*original_sk_state_change)(struct sock *sk);
694
695 };
696
697 static void drbd_incoming_connection(struct sock *sk)
698 {
699         struct accept_wait_data *ad = sk->sk_user_data;
700         void (*state_change)(struct sock *sk);
701
702         state_change = ad->original_sk_state_change;
703         if (sk->sk_state == TCP_ESTABLISHED)
704                 complete(&ad->door_bell);
705         state_change(sk);
706 }
707
708 static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
709 {
710         int err, sndbuf_size, rcvbuf_size, my_addr_len;
711         struct sockaddr_in6 my_addr;
712         struct socket *s_listen;
713         struct net_conf *nc;
714         const char *what;
715
716         rcu_read_lock();
717         nc = rcu_dereference(tconn->net_conf);
718         if (!nc) {
719                 rcu_read_unlock();
720                 return -EIO;
721         }
722         sndbuf_size = nc->sndbuf_size;
723         rcvbuf_size = nc->rcvbuf_size;
724         rcu_read_unlock();
725
726         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
727         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
728
729         what = "sock_create_kern";
730         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
731                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
732         if (err) {
733                 s_listen = NULL;
734                 goto out;
735         }
736
737         s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
738         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
739
740         what = "bind before listen";
741         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
742         if (err < 0)
743                 goto out;
744
745         ad->s_listen = s_listen;
746         write_lock_bh(&s_listen->sk->sk_callback_lock);
747         ad->original_sk_state_change = s_listen->sk->sk_state_change;
748         s_listen->sk->sk_state_change = drbd_incoming_connection;
749         s_listen->sk->sk_user_data = ad;
750         write_unlock_bh(&s_listen->sk->sk_callback_lock);
751
752         what = "listen";
753         err = s_listen->ops->listen(s_listen, 5);
754         if (err < 0)
755                 goto out;
756
757         return 0;
758 out:
759         if (s_listen)
760                 sock_release(s_listen);
761         if (err < 0) {
762                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
763                         conn_err(tconn, "%s failed, err = %d\n", what, err);
764                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
765                 }
766         }
767
768         return -EIO;
769 }
770
771 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
772 {
773         write_lock_bh(&sk->sk_callback_lock);
774         sk->sk_state_change = ad->original_sk_state_change;
775         sk->sk_user_data = NULL;
776         write_unlock_bh(&sk->sk_callback_lock);
777 }
778
779 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
780 {
781         int timeo, connect_int, err = 0;
782         struct socket *s_estab = NULL;
783         struct net_conf *nc;
784
785         rcu_read_lock();
786         nc = rcu_dereference(tconn->net_conf);
787         if (!nc) {
788                 rcu_read_unlock();
789                 return NULL;
790         }
791         connect_int = nc->connect_int;
792         rcu_read_unlock();
793
794         timeo = connect_int * HZ;
795         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
796
797         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
798         if (err <= 0)
799                 return NULL;
800
801         err = kernel_accept(ad->s_listen, &s_estab, 0);
802         if (err < 0) {
803                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
804                         conn_err(tconn, "accept failed, err = %d\n", err);
805                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
806                 }
807         }
808
809         if (s_estab)
810                 unregister_state_change(s_estab->sk, ad);
811
812         return s_estab;
813 }
814
815 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
816
817 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
818                              enum drbd_packet cmd)
819 {
820         if (!conn_prepare_command(tconn, sock))
821                 return -EIO;
822         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
823 }
824
825 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
826 {
827         unsigned int header_size = drbd_header_size(tconn);
828         struct packet_info pi;
829         int err;
830
831         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
832         if (err != header_size) {
833                 if (err >= 0)
834                         err = -EIO;
835                 return err;
836         }
837         err = decode_header(tconn, tconn->data.rbuf, &pi);
838         if (err)
839                 return err;
840         return pi.cmd;
841 }
842
843 /**
844  * drbd_socket_okay() - Free the socket if its connection is not okay
845  * @sock:       pointer to the pointer to the socket.
846  */
847 static int drbd_socket_okay(struct socket **sock)
848 {
849         int rr;
850         char tb[4];
851
852         if (!*sock)
853                 return false;
854
855         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
856
857         if (rr > 0 || rr == -EAGAIN) {
858                 return true;
859         } else {
860                 sock_release(*sock);
861                 *sock = NULL;
862                 return false;
863         }
864 }
865 /* Gets called if a connection is established, or if a new minor gets created
866    in a connection */
867 int drbd_connected(struct drbd_conf *mdev)
868 {
869         int err;
870
871         atomic_set(&mdev->packet_seq, 0);
872         mdev->peer_seq = 0;
873
874         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
875                 &mdev->tconn->cstate_mutex :
876                 &mdev->own_state_mutex;
877
878         err = drbd_send_sync_param(mdev);
879         if (!err)
880                 err = drbd_send_sizes(mdev, 0, 0);
881         if (!err)
882                 err = drbd_send_uuids(mdev);
883         if (!err)
884                 err = drbd_send_current_state(mdev);
885         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
886         clear_bit(RESIZE_PENDING, &mdev->flags);
887         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
888         return err;
889 }
890
891 /*
892  * return values:
893  *   1 yes, we have a valid connection
894  *   0 oops, did not work out, please try again
895  *  -1 peer talks different language,
896  *     no point in trying again, please go standalone.
897  *  -2 We do not have a network config...
898  */
899 static int conn_connect(struct drbd_tconn *tconn)
900 {
901         struct drbd_socket sock, msock;
902         struct drbd_conf *mdev;
903         struct net_conf *nc;
904         int vnr, timeout, h, ok;
905         bool discard_my_data;
906         enum drbd_state_rv rv;
907         struct accept_wait_data ad = {
908                 .tconn = tconn,
909                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
910         };
911
912         clear_bit(DISCONNECT_SENT, &tconn->flags);
913         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
914                 return -2;
915
916         mutex_init(&sock.mutex);
917         sock.sbuf = tconn->data.sbuf;
918         sock.rbuf = tconn->data.rbuf;
919         sock.socket = NULL;
920         mutex_init(&msock.mutex);
921         msock.sbuf = tconn->meta.sbuf;
922         msock.rbuf = tconn->meta.rbuf;
923         msock.socket = NULL;
924
925         /* Assume that the peer only understands protocol 80 until we know better.  */
926         tconn->agreed_pro_version = 80;
927
928         if (prepare_listen_socket(tconn, &ad))
929                 return 0;
930
931         do {
932                 struct socket *s;
933
934                 s = drbd_try_connect(tconn);
935                 if (s) {
936                         if (!sock.socket) {
937                                 sock.socket = s;
938                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
939                         } else if (!msock.socket) {
940                                 clear_bit(RESOLVE_CONFLICTS, &tconn->flags);
941                                 msock.socket = s;
942                                 send_first_packet(tconn, &msock, P_INITIAL_META);
943                         } else {
944                                 conn_err(tconn, "Logic error in conn_connect()\n");
945                                 goto out_release_sockets;
946                         }
947                 }
948
949                 if (sock.socket && msock.socket) {
950                         rcu_read_lock();
951                         nc = rcu_dereference(tconn->net_conf);
952                         timeout = nc->ping_timeo * HZ / 10;
953                         rcu_read_unlock();
954                         schedule_timeout_interruptible(timeout);
955                         ok = drbd_socket_okay(&sock.socket);
956                         ok = drbd_socket_okay(&msock.socket) && ok;
957                         if (ok)
958                                 break;
959                 }
960
961 retry:
962                 s = drbd_wait_for_connect(tconn, &ad);
963                 if (s) {
964                         int fp = receive_first_packet(tconn, s);
965                         drbd_socket_okay(&sock.socket);
966                         drbd_socket_okay(&msock.socket);
967                         switch (fp) {
968                         case P_INITIAL_DATA:
969                                 if (sock.socket) {
970                                         conn_warn(tconn, "initial packet S crossed\n");
971                                         sock_release(sock.socket);
972                                         sock.socket = s;
973                                         goto randomize;
974                                 }
975                                 sock.socket = s;
976                                 break;
977                         case P_INITIAL_META:
978                                 set_bit(RESOLVE_CONFLICTS, &tconn->flags);
979                                 if (msock.socket) {
980                                         conn_warn(tconn, "initial packet M crossed\n");
981                                         sock_release(msock.socket);
982                                         msock.socket = s;
983                                         goto randomize;
984                                 }
985                                 msock.socket = s;
986                                 break;
987                         default:
988                                 conn_warn(tconn, "Error receiving initial packet\n");
989                                 sock_release(s);
990 randomize:
991                                 if (random32() & 1)
992                                         goto retry;
993                         }
994                 }
995
996                 if (tconn->cstate <= C_DISCONNECTING)
997                         goto out_release_sockets;
998                 if (signal_pending(current)) {
999                         flush_signals(current);
1000                         smp_rmb();
1001                         if (get_t_state(&tconn->receiver) == EXITING)
1002                                 goto out_release_sockets;
1003                 }
1004
1005                 ok = drbd_socket_okay(&sock.socket);
1006                 ok = drbd_socket_okay(&msock.socket) && ok;
1007         } while (!ok);
1008
1009         if (ad.s_listen)
1010                 sock_release(ad.s_listen);
1011
1012         sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
1013         msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
1014
1015         sock.socket->sk->sk_allocation = GFP_NOIO;
1016         msock.socket->sk->sk_allocation = GFP_NOIO;
1017
1018         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1019         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1020
1021         /* NOT YET ...
1022          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
1023          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1024          * first set it to the P_CONNECTION_FEATURES timeout,
1025          * which we set to 4x the configured ping_timeout. */
1026         rcu_read_lock();
1027         nc = rcu_dereference(tconn->net_conf);
1028
1029         sock.socket->sk->sk_sndtimeo =
1030         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1031
1032         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1033         timeout = nc->timeout * HZ / 10;
1034         discard_my_data = nc->discard_my_data;
1035         rcu_read_unlock();
1036
1037         msock.socket->sk->sk_sndtimeo = timeout;
1038
1039         /* we don't want delays.
1040          * we use TCP_CORK where appropriate, though */
1041         drbd_tcp_nodelay(sock.socket);
1042         drbd_tcp_nodelay(msock.socket);
1043
1044         tconn->data.socket = sock.socket;
1045         tconn->meta.socket = msock.socket;
1046         tconn->last_received = jiffies;
1047
1048         h = drbd_do_features(tconn);
1049         if (h <= 0)
1050                 return h;
1051
1052         if (tconn->cram_hmac_tfm) {
1053                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
1054                 switch (drbd_do_auth(tconn)) {
1055                 case -1:
1056                         conn_err(tconn, "Authentication of peer failed\n");
1057                         return -1;
1058                 case 0:
1059                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1060                         return 0;
1061                 }
1062         }
1063
1064         tconn->data.socket->sk->sk_sndtimeo = timeout;
1065         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1066
1067         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1068                 return -1;
1069
1070         set_bit(STATE_SENT, &tconn->flags);
1071
1072         rcu_read_lock();
1073         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1074                 kref_get(&mdev->kref);
1075                 rcu_read_unlock();
1076
1077                 if (discard_my_data)
1078                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1079                 else
1080                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1081
1082                 drbd_connected(mdev);
1083                 kref_put(&mdev->kref, &drbd_minor_destroy);
1084                 rcu_read_lock();
1085         }
1086         rcu_read_unlock();
1087
1088         rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1089         if (rv < SS_SUCCESS) {
1090                 clear_bit(STATE_SENT, &tconn->flags);
1091                 return 0;
1092         }
1093
1094         drbd_thread_start(&tconn->asender);
1095
1096         mutex_lock(&tconn->conf_update);
1097         /* The discard_my_data flag is a single-shot modifier to the next
1098          * connection attempt, the handshake of which is now well underway.
1099          * No need for rcu style copying of the whole struct
1100          * just to clear a single value. */
1101         tconn->net_conf->discard_my_data = 0;
1102         mutex_unlock(&tconn->conf_update);
1103
1104         return h;
1105
1106 out_release_sockets:
1107         if (ad.s_listen)
1108                 sock_release(ad.s_listen);
1109         if (sock.socket)
1110                 sock_release(sock.socket);
1111         if (msock.socket)
1112                 sock_release(msock.socket);
1113         return -1;
1114 }
1115
1116 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1117 {
1118         unsigned int header_size = drbd_header_size(tconn);
1119
1120         if (header_size == sizeof(struct p_header100) &&
1121             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1122                 struct p_header100 *h = header;
1123                 if (h->pad != 0) {
1124                         conn_err(tconn, "Header padding is not zero\n");
1125                         return -EINVAL;
1126                 }
1127                 pi->vnr = be16_to_cpu(h->volume);
1128                 pi->cmd = be16_to_cpu(h->command);
1129                 pi->size = be32_to_cpu(h->length);
1130         } else if (header_size == sizeof(struct p_header95) &&
1131                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1132                 struct p_header95 *h = header;
1133                 pi->cmd = be16_to_cpu(h->command);
1134                 pi->size = be32_to_cpu(h->length);
1135                 pi->vnr = 0;
1136         } else if (header_size == sizeof(struct p_header80) &&
1137                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1138                 struct p_header80 *h = header;
1139                 pi->cmd = be16_to_cpu(h->command);
1140                 pi->size = be16_to_cpu(h->length);
1141                 pi->vnr = 0;
1142         } else {
1143                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1144                          be32_to_cpu(*(__be32 *)header),
1145                          tconn->agreed_pro_version);
1146                 return -EINVAL;
1147         }
1148         pi->data = header + header_size;
1149         return 0;
1150 }
1151
1152 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1153 {
1154         void *buffer = tconn->data.rbuf;
1155         int err;
1156
1157         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1158         if (err)
1159                 return err;
1160
1161         err = decode_header(tconn, buffer, pi);
1162         tconn->last_received = jiffies;
1163
1164         return err;
1165 }
1166
1167 static void drbd_flush(struct drbd_tconn *tconn)
1168 {
1169         int rv;
1170         struct drbd_conf *mdev;
1171         int vnr;
1172
1173         if (tconn->write_ordering >= WO_bdev_flush) {
1174                 rcu_read_lock();
1175                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1176                         if (!get_ldev(mdev))
1177                                 continue;
1178                         kref_get(&mdev->kref);
1179                         rcu_read_unlock();
1180
1181                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1182                                         GFP_NOIO, NULL);
1183                         if (rv) {
1184                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1185                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1186                                  * don't try again for ANY return value != 0
1187                                  * if (rv == -EOPNOTSUPP) */
1188                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1189                         }
1190                         put_ldev(mdev);
1191                         kref_put(&mdev->kref, &drbd_minor_destroy);
1192
1193                         rcu_read_lock();
1194                         if (rv)
1195                                 break;
1196                 }
1197                 rcu_read_unlock();
1198         }
1199 }
1200
1201 /**
1202  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1203  * @mdev:       DRBD device.
1204  * @epoch:      Epoch object.
1205  * @ev:         Epoch event.
1206  */
1207 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1208                                                struct drbd_epoch *epoch,
1209                                                enum epoch_event ev)
1210 {
1211         int epoch_size;
1212         struct drbd_epoch *next_epoch;
1213         enum finish_epoch rv = FE_STILL_LIVE;
1214
1215         spin_lock(&tconn->epoch_lock);
1216         do {
1217                 next_epoch = NULL;
1218
1219                 epoch_size = atomic_read(&epoch->epoch_size);
1220
1221                 switch (ev & ~EV_CLEANUP) {
1222                 case EV_PUT:
1223                         atomic_dec(&epoch->active);
1224                         break;
1225                 case EV_GOT_BARRIER_NR:
1226                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1227                         break;
1228                 case EV_BECAME_LAST:
1229                         /* nothing to do*/
1230                         break;
1231                 }
1232
1233                 if (epoch_size != 0 &&
1234                     atomic_read(&epoch->active) == 0 &&
1235                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1236                         if (!(ev & EV_CLEANUP)) {
1237                                 spin_unlock(&tconn->epoch_lock);
1238                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1239                                 spin_lock(&tconn->epoch_lock);
1240                         }
1241 #if 0
1242                         /* FIXME: dec unacked on connection, once we have
1243                          * something to count pending connection packets in. */
1244                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1245                                 dec_unacked(epoch->tconn);
1246 #endif
1247
1248                         if (tconn->current_epoch != epoch) {
1249                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1250                                 list_del(&epoch->list);
1251                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1252                                 tconn->epochs--;
1253                                 kfree(epoch);
1254
1255                                 if (rv == FE_STILL_LIVE)
1256                                         rv = FE_DESTROYED;
1257                         } else {
1258                                 epoch->flags = 0;
1259                                 atomic_set(&epoch->epoch_size, 0);
1260                                 /* atomic_set(&epoch->active, 0); is already zero */
1261                                 if (rv == FE_STILL_LIVE)
1262                                         rv = FE_RECYCLED;
1263                         }
1264                 }
1265
1266                 if (!next_epoch)
1267                         break;
1268
1269                 epoch = next_epoch;
1270         } while (1);
1271
1272         spin_unlock(&tconn->epoch_lock);
1273
1274         return rv;
1275 }
1276
1277 /**
1278  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1279  * @tconn:      DRBD connection.
1280  * @wo:         Write ordering method to try.
1281  */
1282 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1283 {
1284         struct disk_conf *dc;
1285         struct drbd_conf *mdev;
1286         enum write_ordering_e pwo;
1287         int vnr;
1288         static char *write_ordering_str[] = {
1289                 [WO_none] = "none",
1290                 [WO_drain_io] = "drain",
1291                 [WO_bdev_flush] = "flush",
1292         };
1293
1294         pwo = tconn->write_ordering;
1295         wo = min(pwo, wo);
1296         rcu_read_lock();
1297         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1298                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1299                         continue;
1300                 dc = rcu_dereference(mdev->ldev->disk_conf);
1301
1302                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1303                         wo = WO_drain_io;
1304                 if (wo == WO_drain_io && !dc->disk_drain)
1305                         wo = WO_none;
1306                 put_ldev(mdev);
1307         }
1308         rcu_read_unlock();
1309         tconn->write_ordering = wo;
1310         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1311                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1312 }
1313
1314 /**
1315  * drbd_submit_peer_request()
1316  * @mdev:       DRBD device.
1317  * @peer_req:   peer request
1318  * @rw:         flag field, see bio->bi_rw
1319  *
1320  * May spread the pages to multiple bios,
1321  * depending on bio_add_page restrictions.
1322  *
1323  * Returns 0 if all bios have been submitted,
1324  * -ENOMEM if we could not allocate enough bios,
1325  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1326  *  single page to an empty bio (which should never happen and likely indicates
1327  *  that the lower level IO stack is in some way broken). This has been observed
1328  *  on certain Xen deployments.
1329  */
1330 /* TODO allocate from our own bio_set. */
1331 int drbd_submit_peer_request(struct drbd_conf *mdev,
1332                              struct drbd_peer_request *peer_req,
1333                              const unsigned rw, const int fault_type)
1334 {
1335         struct bio *bios = NULL;
1336         struct bio *bio;
1337         struct page *page = peer_req->pages;
1338         sector_t sector = peer_req->i.sector;
1339         unsigned ds = peer_req->i.size;
1340         unsigned n_bios = 0;
1341         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1342         int err = -ENOMEM;
1343
1344         /* In most cases, we will only need one bio.  But in case the lower
1345          * level restrictions happen to be different at this offset on this
1346          * side than those of the sending peer, we may need to submit the
1347          * request in more than one bio.
1348          *
1349          * Plain bio_alloc is good enough here, this is no DRBD internally
1350          * generated bio, but a bio allocated on behalf of the peer.
1351          */
1352 next_bio:
1353         bio = bio_alloc(GFP_NOIO, nr_pages);
1354         if (!bio) {
1355                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1356                 goto fail;
1357         }
1358         /* > peer_req->i.sector, unless this is the first bio */
1359         bio->bi_sector = sector;
1360         bio->bi_bdev = mdev->ldev->backing_bdev;
1361         bio->bi_rw = rw;
1362         bio->bi_private = peer_req;
1363         bio->bi_end_io = drbd_peer_request_endio;
1364
1365         bio->bi_next = bios;
1366         bios = bio;
1367         ++n_bios;
1368
1369         page_chain_for_each(page) {
1370                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1371                 if (!bio_add_page(bio, page, len, 0)) {
1372                         /* A single page must always be possible!
1373                          * But in case it fails anyways,
1374                          * we deal with it, and complain (below). */
1375                         if (bio->bi_vcnt == 0) {
1376                                 dev_err(DEV,
1377                                         "bio_add_page failed for len=%u, "
1378                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1379                                         len, (unsigned long long)bio->bi_sector);
1380                                 err = -ENOSPC;
1381                                 goto fail;
1382                         }
1383                         goto next_bio;
1384                 }
1385                 ds -= len;
1386                 sector += len >> 9;
1387                 --nr_pages;
1388         }
1389         D_ASSERT(page == NULL);
1390         D_ASSERT(ds == 0);
1391
1392         atomic_set(&peer_req->pending_bios, n_bios);
1393         do {
1394                 bio = bios;
1395                 bios = bios->bi_next;
1396                 bio->bi_next = NULL;
1397
1398                 drbd_generic_make_request(mdev, fault_type, bio);
1399         } while (bios);
1400         return 0;
1401
1402 fail:
1403         while (bios) {
1404                 bio = bios;
1405                 bios = bios->bi_next;
1406                 bio_put(bio);
1407         }
1408         return err;
1409 }
1410
1411 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1412                                              struct drbd_peer_request *peer_req)
1413 {
1414         struct drbd_interval *i = &peer_req->i;
1415
1416         drbd_remove_interval(&mdev->write_requests, i);
1417         drbd_clear_interval(i);
1418
1419         /* Wake up any processes waiting for this peer request to complete.  */
1420         if (i->waiting)
1421                 wake_up(&mdev->misc_wait);
1422 }
1423
1424 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1425 {
1426         struct drbd_conf *mdev;
1427         int vnr;
1428
1429         rcu_read_lock();
1430         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1431                 kref_get(&mdev->kref);
1432                 rcu_read_unlock();
1433                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1434                 kref_put(&mdev->kref, &drbd_minor_destroy);
1435                 rcu_read_lock();
1436         }
1437         rcu_read_unlock();
1438 }
1439
1440 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1441 {
1442         int rv;
1443         struct p_barrier *p = pi->data;
1444         struct drbd_epoch *epoch;
1445
1446         /* FIXME these are unacked on connection,
1447          * not a specific (peer)device.
1448          */
1449         tconn->current_epoch->barrier_nr = p->barrier;
1450         tconn->current_epoch->tconn = tconn;
1451         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1452
1453         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1454          * the activity log, which means it would not be resynced in case the
1455          * R_PRIMARY crashes now.
1456          * Therefore we must send the barrier_ack after the barrier request was
1457          * completed. */
1458         switch (tconn->write_ordering) {
1459         case WO_none:
1460                 if (rv == FE_RECYCLED)
1461                         return 0;
1462
1463                 /* receiver context, in the writeout path of the other node.
1464                  * avoid potential distributed deadlock */
1465                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1466                 if (epoch)
1467                         break;
1468                 else
1469                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1470                         /* Fall through */
1471
1472         case WO_bdev_flush:
1473         case WO_drain_io:
1474                 conn_wait_active_ee_empty(tconn);
1475                 drbd_flush(tconn);
1476
1477                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1478                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1479                         if (epoch)
1480                                 break;
1481                 }
1482
1483                 return 0;
1484         default:
1485                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1486                 return -EIO;
1487         }
1488
1489         epoch->flags = 0;
1490         atomic_set(&epoch->epoch_size, 0);
1491         atomic_set(&epoch->active, 0);
1492
1493         spin_lock(&tconn->epoch_lock);
1494         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1495                 list_add(&epoch->list, &tconn->current_epoch->list);
1496                 tconn->current_epoch = epoch;
1497                 tconn->epochs++;
1498         } else {
1499                 /* The current_epoch got recycled while we allocated this one... */
1500                 kfree(epoch);
1501         }
1502         spin_unlock(&tconn->epoch_lock);
1503
1504         return 0;
1505 }
1506
1507 /* used from receive_RSDataReply (recv_resync_read)
1508  * and from receive_Data */
1509 static struct drbd_peer_request *
1510 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1511               int data_size) __must_hold(local)
1512 {
1513         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1514         struct drbd_peer_request *peer_req;
1515         struct page *page;
1516         int dgs, ds, err;
1517         void *dig_in = mdev->tconn->int_dig_in;
1518         void *dig_vv = mdev->tconn->int_dig_vv;
1519         unsigned long *data;
1520
1521         dgs = 0;
1522         if (mdev->tconn->peer_integrity_tfm) {
1523                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1524                 /*
1525                  * FIXME: Receive the incoming digest into the receive buffer
1526                  *        here, together with its struct p_data?
1527                  */
1528                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1529                 if (err)
1530                         return NULL;
1531                 data_size -= dgs;
1532         }
1533
1534         if (!expect(IS_ALIGNED(data_size, 512)))
1535                 return NULL;
1536         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1537                 return NULL;
1538
1539         /* even though we trust out peer,
1540          * we sometimes have to double check. */
1541         if (sector + (data_size>>9) > capacity) {
1542                 dev_err(DEV, "request from peer beyond end of local disk: "
1543                         "capacity: %llus < sector: %llus + size: %u\n",
1544                         (unsigned long long)capacity,
1545                         (unsigned long long)sector, data_size);
1546                 return NULL;
1547         }
1548
1549         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1550          * "criss-cross" setup, that might cause write-out on some other DRBD,
1551          * which in turn might block on the other node at this very place.  */
1552         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1553         if (!peer_req)
1554                 return NULL;
1555
1556         if (!data_size)
1557                 return peer_req;
1558
1559         ds = data_size;
1560         page = peer_req->pages;
1561         page_chain_for_each(page) {
1562                 unsigned len = min_t(int, ds, PAGE_SIZE);
1563                 data = kmap(page);
1564                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1565                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1566                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1567                         data[0] = data[0] ^ (unsigned long)-1;
1568                 }
1569                 kunmap(page);
1570                 if (err) {
1571                         drbd_free_peer_req(mdev, peer_req);
1572                         return NULL;
1573                 }
1574                 ds -= len;
1575         }
1576
1577         if (dgs) {
1578                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1579                 if (memcmp(dig_in, dig_vv, dgs)) {
1580                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1581                                 (unsigned long long)sector, data_size);
1582                         drbd_free_peer_req(mdev, peer_req);
1583                         return NULL;
1584                 }
1585         }
1586         mdev->recv_cnt += data_size>>9;
1587         return peer_req;
1588 }
1589
1590 /* drbd_drain_block() just takes a data block
1591  * out of the socket input buffer, and discards it.
1592  */
1593 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1594 {
1595         struct page *page;
1596         int err = 0;
1597         void *data;
1598
1599         if (!data_size)
1600                 return 0;
1601
1602         page = drbd_alloc_pages(mdev, 1, 1);
1603
1604         data = kmap(page);
1605         while (data_size) {
1606                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1607
1608                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1609                 if (err)
1610                         break;
1611                 data_size -= len;
1612         }
1613         kunmap(page);
1614         drbd_free_pages(mdev, page, 0);
1615         return err;
1616 }
1617
1618 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1619                            sector_t sector, int data_size)
1620 {
1621         struct bio_vec *bvec;
1622         struct bio *bio;
1623         int dgs, err, i, expect;
1624         void *dig_in = mdev->tconn->int_dig_in;
1625         void *dig_vv = mdev->tconn->int_dig_vv;
1626
1627         dgs = 0;
1628         if (mdev->tconn->peer_integrity_tfm) {
1629                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1630                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1631                 if (err)
1632                         return err;
1633                 data_size -= dgs;
1634         }
1635
1636         /* optimistically update recv_cnt.  if receiving fails below,
1637          * we disconnect anyways, and counters will be reset. */
1638         mdev->recv_cnt += data_size>>9;
1639
1640         bio = req->master_bio;
1641         D_ASSERT(sector == bio->bi_sector);
1642
1643         bio_for_each_segment(bvec, bio, i) {
1644                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1645                 expect = min_t(int, data_size, bvec->bv_len);
1646                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1647                 kunmap(bvec->bv_page);
1648                 if (err)
1649                         return err;
1650                 data_size -= expect;
1651         }
1652
1653         if (dgs) {
1654                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1655                 if (memcmp(dig_in, dig_vv, dgs)) {
1656                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1657                         return -EINVAL;
1658                 }
1659         }
1660
1661         D_ASSERT(data_size == 0);
1662         return 0;
1663 }
1664
1665 /*
1666  * e_end_resync_block() is called in asender context via
1667  * drbd_finish_peer_reqs().
1668  */
1669 static int e_end_resync_block(struct drbd_work *w, int unused)
1670 {
1671         struct drbd_peer_request *peer_req =
1672                 container_of(w, struct drbd_peer_request, w);
1673         struct drbd_conf *mdev = w->mdev;
1674         sector_t sector = peer_req->i.sector;
1675         int err;
1676
1677         D_ASSERT(drbd_interval_empty(&peer_req->i));
1678
1679         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1680                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1681                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1682         } else {
1683                 /* Record failure to sync */
1684                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1685
1686                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1687         }
1688         dec_unacked(mdev);
1689
1690         return err;
1691 }
1692
1693 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1694 {
1695         struct drbd_peer_request *peer_req;
1696
1697         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1698         if (!peer_req)
1699                 goto fail;
1700
1701         dec_rs_pending(mdev);
1702
1703         inc_unacked(mdev);
1704         /* corresponding dec_unacked() in e_end_resync_block()
1705          * respective _drbd_clear_done_ee */
1706
1707         peer_req->w.cb = e_end_resync_block;
1708
1709         spin_lock_irq(&mdev->tconn->req_lock);
1710         list_add(&peer_req->w.list, &mdev->sync_ee);
1711         spin_unlock_irq(&mdev->tconn->req_lock);
1712
1713         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1714         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1715                 return 0;
1716
1717         /* don't care for the reason here */
1718         dev_err(DEV, "submit failed, triggering re-connect\n");
1719         spin_lock_irq(&mdev->tconn->req_lock);
1720         list_del(&peer_req->w.list);
1721         spin_unlock_irq(&mdev->tconn->req_lock);
1722
1723         drbd_free_peer_req(mdev, peer_req);
1724 fail:
1725         put_ldev(mdev);
1726         return -EIO;
1727 }
1728
1729 static struct drbd_request *
1730 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1731              sector_t sector, bool missing_ok, const char *func)
1732 {
1733         struct drbd_request *req;
1734
1735         /* Request object according to our peer */
1736         req = (struct drbd_request *)(unsigned long)id;
1737         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1738                 return req;
1739         if (!missing_ok) {
1740                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1741                         (unsigned long)id, (unsigned long long)sector);
1742         }
1743         return NULL;
1744 }
1745
1746 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1747 {
1748         struct drbd_conf *mdev;
1749         struct drbd_request *req;
1750         sector_t sector;
1751         int err;
1752         struct p_data *p = pi->data;
1753
1754         mdev = vnr_to_mdev(tconn, pi->vnr);
1755         if (!mdev)
1756                 return -EIO;
1757
1758         sector = be64_to_cpu(p->sector);
1759
1760         spin_lock_irq(&mdev->tconn->req_lock);
1761         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1762         spin_unlock_irq(&mdev->tconn->req_lock);
1763         if (unlikely(!req))
1764                 return -EIO;
1765
1766         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1767          * special casing it there for the various failure cases.
1768          * still no race with drbd_fail_pending_reads */
1769         err = recv_dless_read(mdev, req, sector, pi->size);
1770         if (!err)
1771                 req_mod(req, DATA_RECEIVED);
1772         /* else: nothing. handled from drbd_disconnect...
1773          * I don't think we may complete this just yet
1774          * in case we are "on-disconnect: freeze" */
1775
1776         return err;
1777 }
1778
1779 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1780 {
1781         struct drbd_conf *mdev;
1782         sector_t sector;
1783         int err;
1784         struct p_data *p = pi->data;
1785
1786         mdev = vnr_to_mdev(tconn, pi->vnr);
1787         if (!mdev)
1788                 return -EIO;
1789
1790         sector = be64_to_cpu(p->sector);
1791         D_ASSERT(p->block_id == ID_SYNCER);
1792
1793         if (get_ldev(mdev)) {
1794                 /* data is submitted to disk within recv_resync_read.
1795                  * corresponding put_ldev done below on error,
1796                  * or in drbd_peer_request_endio. */
1797                 err = recv_resync_read(mdev, sector, pi->size);
1798         } else {
1799                 if (__ratelimit(&drbd_ratelimit_state))
1800                         dev_err(DEV, "Can not write resync data to local disk.\n");
1801
1802                 err = drbd_drain_block(mdev, pi->size);
1803
1804                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1805         }
1806
1807         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1808
1809         return err;
1810 }
1811
1812 static void restart_conflicting_writes(struct drbd_conf *mdev,
1813                                        sector_t sector, int size)
1814 {
1815         struct drbd_interval *i;
1816         struct drbd_request *req;
1817
1818         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1819                 if (!i->local)
1820                         continue;
1821                 req = container_of(i, struct drbd_request, i);
1822                 if (req->rq_state & RQ_LOCAL_PENDING ||
1823                     !(req->rq_state & RQ_POSTPONED))
1824                         continue;
1825                 /* as it is RQ_POSTPONED, this will cause it to
1826                  * be queued on the retry workqueue. */
1827                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1828         }
1829 }
1830
1831 /*
1832  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1833  */
1834 static int e_end_block(struct drbd_work *w, int cancel)
1835 {
1836         struct drbd_peer_request *peer_req =
1837                 container_of(w, struct drbd_peer_request, w);
1838         struct drbd_conf *mdev = w->mdev;
1839         sector_t sector = peer_req->i.sector;
1840         int err = 0, pcmd;
1841
1842         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1843                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1844                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1845                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1846                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1847                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1848                         err = drbd_send_ack(mdev, pcmd, peer_req);
1849                         if (pcmd == P_RS_WRITE_ACK)
1850                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1851                 } else {
1852                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1853                         /* we expect it to be marked out of sync anyways...
1854                          * maybe assert this?  */
1855                 }
1856                 dec_unacked(mdev);
1857         }
1858         /* we delete from the conflict detection hash _after_ we sent out the
1859          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1860         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1861                 spin_lock_irq(&mdev->tconn->req_lock);
1862                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1863                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1864                 if (peer_req->flags & EE_RESTART_REQUESTS)
1865                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1866                 spin_unlock_irq(&mdev->tconn->req_lock);
1867         } else
1868                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1869
1870         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1871
1872         return err;
1873 }
1874
1875 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1876 {
1877         struct drbd_conf *mdev = w->mdev;
1878         struct drbd_peer_request *peer_req =
1879                 container_of(w, struct drbd_peer_request, w);
1880         int err;
1881
1882         err = drbd_send_ack(mdev, ack, peer_req);
1883         dec_unacked(mdev);
1884
1885         return err;
1886 }
1887
1888 static int e_send_superseded(struct drbd_work *w, int unused)
1889 {
1890         return e_send_ack(w, P_SUPERSEDED);
1891 }
1892
1893 static int e_send_retry_write(struct drbd_work *w, int unused)
1894 {
1895         struct drbd_tconn *tconn = w->mdev->tconn;
1896
1897         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1898                              P_RETRY_WRITE : P_SUPERSEDED);
1899 }
1900
1901 static bool seq_greater(u32 a, u32 b)
1902 {
1903         /*
1904          * We assume 32-bit wrap-around here.
1905          * For 24-bit wrap-around, we would have to shift:
1906          *  a <<= 8; b <<= 8;
1907          */
1908         return (s32)a - (s32)b > 0;
1909 }
1910
1911 static u32 seq_max(u32 a, u32 b)
1912 {
1913         return seq_greater(a, b) ? a : b;
1914 }
1915
1916 static bool need_peer_seq(struct drbd_conf *mdev)
1917 {
1918         struct drbd_tconn *tconn = mdev->tconn;
1919         int tp;
1920
1921         /*
1922          * We only need to keep track of the last packet_seq number of our peer
1923          * if we are in dual-primary mode and we have the resolve-conflicts flag set; see
1924          * handle_write_conflicts().
1925          */
1926
1927         rcu_read_lock();
1928         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1929         rcu_read_unlock();
1930
1931         return tp && test_bit(RESOLVE_CONFLICTS, &tconn->flags);
1932 }
1933
1934 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1935 {
1936         unsigned int newest_peer_seq;
1937
1938         if (need_peer_seq(mdev)) {
1939                 spin_lock(&mdev->peer_seq_lock);
1940                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1941                 mdev->peer_seq = newest_peer_seq;
1942                 spin_unlock(&mdev->peer_seq_lock);
1943                 /* wake up only if we actually changed mdev->peer_seq */
1944                 if (peer_seq == newest_peer_seq)
1945                         wake_up(&mdev->seq_wait);
1946         }
1947 }
1948
1949 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1950 {
1951         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1952 }
1953
1954 /* maybe change sync_ee into interval trees as well? */
1955 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1956 {
1957         struct drbd_peer_request *rs_req;
1958         bool rv = 0;
1959
1960         spin_lock_irq(&mdev->tconn->req_lock);
1961         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1962                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1963                              rs_req->i.sector, rs_req->i.size)) {
1964                         rv = 1;
1965                         break;
1966                 }
1967         }
1968         spin_unlock_irq(&mdev->tconn->req_lock);
1969
1970         return rv;
1971 }
1972
1973 /* Called from receive_Data.
1974  * Synchronize packets on sock with packets on msock.
1975  *
1976  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1977  * packet traveling on msock, they are still processed in the order they have
1978  * been sent.
1979  *
1980  * Note: we don't care for Ack packets overtaking P_DATA packets.
1981  *
1982  * In case packet_seq is larger than mdev->peer_seq number, there are
1983  * outstanding packets on the msock. We wait for them to arrive.
1984  * In case we are the logically next packet, we update mdev->peer_seq
1985  * ourselves. Correctly handles 32bit wrap around.
1986  *
1987  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1988  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1989  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1990  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1991  *
1992  * returns 0 if we may process the packet,
1993  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1994 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1995 {
1996         DEFINE_WAIT(wait);
1997         long timeout;
1998         int ret;
1999
2000         if (!need_peer_seq(mdev))
2001                 return 0;
2002
2003         spin_lock(&mdev->peer_seq_lock);
2004         for (;;) {
2005                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
2006                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
2007                         ret = 0;
2008                         break;
2009                 }
2010                 if (signal_pending(current)) {
2011                         ret = -ERESTARTSYS;
2012                         break;
2013                 }
2014                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
2015                 spin_unlock(&mdev->peer_seq_lock);
2016                 rcu_read_lock();
2017                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
2018                 rcu_read_unlock();
2019                 timeout = schedule_timeout(timeout);
2020                 spin_lock(&mdev->peer_seq_lock);
2021                 if (!timeout) {
2022                         ret = -ETIMEDOUT;
2023                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
2024                         break;
2025                 }
2026         }
2027         spin_unlock(&mdev->peer_seq_lock);
2028         finish_wait(&mdev->seq_wait, &wait);
2029         return ret;
2030 }
2031
2032 /* see also bio_flags_to_wire()
2033  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2034  * flags and back. We may replicate to other kernel versions. */
2035 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
2036 {
2037         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2038                 (dpf & DP_FUA ? REQ_FUA : 0) |
2039                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2040                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2041 }
2042
2043 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2044                                     unsigned int size)
2045 {
2046         struct drbd_interval *i;
2047
2048     repeat:
2049         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2050                 struct drbd_request *req;
2051                 struct bio_and_error m;
2052
2053                 if (!i->local)
2054                         continue;
2055                 req = container_of(i, struct drbd_request, i);
2056                 if (!(req->rq_state & RQ_POSTPONED))
2057                         continue;
2058                 req->rq_state &= ~RQ_POSTPONED;
2059                 __req_mod(req, NEG_ACKED, &m);
2060                 spin_unlock_irq(&mdev->tconn->req_lock);
2061                 if (m.bio)
2062                         complete_master_bio(mdev, &m);
2063                 spin_lock_irq(&mdev->tconn->req_lock);
2064                 goto repeat;
2065         }
2066 }
2067
2068 static int handle_write_conflicts(struct drbd_conf *mdev,
2069                                   struct drbd_peer_request *peer_req)
2070 {
2071         struct drbd_tconn *tconn = mdev->tconn;
2072         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &tconn->flags);
2073         sector_t sector = peer_req->i.sector;
2074         const unsigned int size = peer_req->i.size;
2075         struct drbd_interval *i;
2076         bool equal;
2077         int err;
2078
2079         /*
2080          * Inserting the peer request into the write_requests tree will prevent
2081          * new conflicting local requests from being added.
2082          */
2083         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2084
2085     repeat:
2086         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2087                 if (i == &peer_req->i)
2088                         continue;
2089
2090                 if (!i->local) {
2091                         /*
2092                          * Our peer has sent a conflicting remote request; this
2093                          * should not happen in a two-node setup.  Wait for the
2094                          * earlier peer request to complete.
2095                          */
2096                         err = drbd_wait_misc(mdev, i);
2097                         if (err)
2098                                 goto out;
2099                         goto repeat;
2100                 }
2101
2102                 equal = i->sector == sector && i->size == size;
2103                 if (resolve_conflicts) {
2104                         /*
2105                          * If the peer request is fully contained within the
2106                          * overlapping request, it can be considered overwritten
2107                          * and thus superseded; otherwise, it will be retried
2108                          * once all overlapping requests have completed.
2109                          */
2110                         bool superseded = i->sector <= sector && i->sector +
2111                                        (i->size >> 9) >= sector + (size >> 9);
2112
2113                         if (!equal)
2114                                 dev_alert(DEV, "Concurrent writes detected: "
2115                                                "local=%llus +%u, remote=%llus +%u, "
2116                                                "assuming %s came first\n",
2117                                           (unsigned long long)i->sector, i->size,
2118                                           (unsigned long long)sector, size,
2119                                           superseded ? "local" : "remote");
2120
2121                         inc_unacked(mdev);
2122                         peer_req->w.cb = superseded ? e_send_superseded :
2123                                                    e_send_retry_write;
2124                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2125                         wake_asender(mdev->tconn);
2126
2127                         err = -ENOENT;
2128                         goto out;
2129                 } else {
2130                         struct drbd_request *req =
2131                                 container_of(i, struct drbd_request, i);
2132
2133                         if (!equal)
2134                                 dev_alert(DEV, "Concurrent writes detected: "
2135                                                "local=%llus +%u, remote=%llus +%u\n",
2136                                           (unsigned long long)i->sector, i->size,
2137                                           (unsigned long long)sector, size);
2138
2139                         if (req->rq_state & RQ_LOCAL_PENDING ||
2140                             !(req->rq_state & RQ_POSTPONED)) {
2141                                 /*
2142                                  * Wait for the node with the discard flag to
2143                                  * decide if this request has been superseded
2144                                  * or needs to be retried.
2145                                  * Requests that have been superseded will
2146                                  * disappear from the write_requests tree.
2147                                  *
2148                                  * In addition, wait for the conflicting
2149                                  * request to finish locally before submitting
2150                                  * the conflicting peer request.
2151                                  */
2152                                 err = drbd_wait_misc(mdev, &req->i);
2153                                 if (err) {
2154                                         _conn_request_state(mdev->tconn,
2155                                                             NS(conn, C_TIMEOUT),
2156                                                             CS_HARD);
2157                                         fail_postponed_requests(mdev, sector, size);
2158                                         goto out;
2159                                 }
2160                                 goto repeat;
2161                         }
2162                         /*
2163                          * Remember to restart the conflicting requests after
2164                          * the new peer request has completed.
2165                          */
2166                         peer_req->flags |= EE_RESTART_REQUESTS;
2167                 }
2168         }
2169         err = 0;
2170
2171     out:
2172         if (err)
2173                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2174         return err;
2175 }
2176
2177 /* mirrored write */
2178 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2179 {
2180         struct drbd_conf *mdev;
2181         sector_t sector;
2182         struct drbd_peer_request *peer_req;
2183         struct p_data *p = pi->data;
2184         u32 peer_seq = be32_to_cpu(p->seq_num);
2185         int rw = WRITE;
2186         u32 dp_flags;
2187         int err, tp;
2188
2189         mdev = vnr_to_mdev(tconn, pi->vnr);
2190         if (!mdev)
2191                 return -EIO;
2192
2193         if (!get_ldev(mdev)) {
2194                 int err2;
2195
2196                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2197                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2198                 atomic_inc(&tconn->current_epoch->epoch_size);
2199                 err2 = drbd_drain_block(mdev, pi->size);
2200                 if (!err)
2201                         err = err2;
2202                 return err;
2203         }
2204
2205         /*
2206          * Corresponding put_ldev done either below (on various errors), or in
2207          * drbd_peer_request_endio, if we successfully submit the data at the
2208          * end of this function.
2209          */
2210
2211         sector = be64_to_cpu(p->sector);
2212         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2213         if (!peer_req) {
2214                 put_ldev(mdev);
2215                 return -EIO;
2216         }
2217
2218         peer_req->w.cb = e_end_block;
2219
2220         dp_flags = be32_to_cpu(p->dp_flags);
2221         rw |= wire_flags_to_bio(mdev, dp_flags);
2222         if (peer_req->pages == NULL) {
2223                 D_ASSERT(peer_req->i.size == 0);
2224                 D_ASSERT(dp_flags & DP_FLUSH);
2225         }
2226
2227         if (dp_flags & DP_MAY_SET_IN_SYNC)
2228                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2229
2230         spin_lock(&tconn->epoch_lock);
2231         peer_req->epoch = tconn->current_epoch;
2232         atomic_inc(&peer_req->epoch->epoch_size);
2233         atomic_inc(&peer_req->epoch->active);
2234         spin_unlock(&tconn->epoch_lock);
2235
2236         rcu_read_lock();
2237         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2238         rcu_read_unlock();
2239         if (tp) {
2240                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2241                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2242                 if (err)
2243                         goto out_interrupted;
2244                 spin_lock_irq(&mdev->tconn->req_lock);
2245                 err = handle_write_conflicts(mdev, peer_req);
2246                 if (err) {
2247                         spin_unlock_irq(&mdev->tconn->req_lock);
2248                         if (err == -ENOENT) {
2249                                 put_ldev(mdev);
2250                                 return 0;
2251                         }
2252                         goto out_interrupted;
2253                 }
2254         } else
2255                 spin_lock_irq(&mdev->tconn->req_lock);
2256         list_add(&peer_req->w.list, &mdev->active_ee);
2257         spin_unlock_irq(&mdev->tconn->req_lock);
2258
2259         if (mdev->state.conn == C_SYNC_TARGET)
2260                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2261
2262         if (mdev->tconn->agreed_pro_version < 100) {
2263                 rcu_read_lock();
2264                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2265                 case DRBD_PROT_C:
2266                         dp_flags |= DP_SEND_WRITE_ACK;
2267                         break;
2268                 case DRBD_PROT_B:
2269                         dp_flags |= DP_SEND_RECEIVE_ACK;
2270                         break;
2271                 }
2272                 rcu_read_unlock();
2273         }
2274
2275         if (dp_flags & DP_SEND_WRITE_ACK) {
2276                 peer_req->flags |= EE_SEND_WRITE_ACK;
2277                 inc_unacked(mdev);
2278                 /* corresponding dec_unacked() in e_end_block()
2279                  * respective _drbd_clear_done_ee */
2280         }
2281
2282         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2283                 /* I really don't like it that the receiver thread
2284                  * sends on the msock, but anyways */
2285                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2286         }
2287
2288         if (mdev->state.pdsk < D_INCONSISTENT) {
2289                 /* In case we have the only disk of the cluster, */
2290                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2291                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2292                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2293                 drbd_al_begin_io(mdev, &peer_req->i);
2294         }
2295
2296         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2297         if (!err)
2298                 return 0;
2299
2300         /* don't care for the reason here */
2301         dev_err(DEV, "submit failed, triggering re-connect\n");
2302         spin_lock_irq(&mdev->tconn->req_lock);
2303         list_del(&peer_req->w.list);
2304         drbd_remove_epoch_entry_interval(mdev, peer_req);
2305         spin_unlock_irq(&mdev->tconn->req_lock);
2306         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2307                 drbd_al_complete_io(mdev, &peer_req->i);
2308
2309 out_interrupted:
2310         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2311         put_ldev(mdev);
2312         drbd_free_peer_req(mdev, peer_req);
2313         return err;
2314 }
2315
2316 /* We may throttle resync, if the lower device seems to be busy,
2317  * and current sync rate is above c_min_rate.
2318  *
2319  * To decide whether or not the lower device is busy, we use a scheme similar
2320  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2321  * (more than 64 sectors) of activity we cannot account for with our own resync
2322  * activity, it obviously is "busy".
2323  *
2324  * The current sync rate used here uses only the most recent two step marks,
2325  * to have a short time average so we can react faster.
2326  */
2327 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2328 {
2329         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2330         unsigned long db, dt, dbdt;
2331         struct lc_element *tmp;
2332         int curr_events;
2333         int throttle = 0;
2334         unsigned int c_min_rate;
2335
2336         rcu_read_lock();
2337         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2338         rcu_read_unlock();
2339
2340         /* feature disabled? */
2341         if (c_min_rate == 0)
2342                 return 0;
2343
2344         spin_lock_irq(&mdev->al_lock);
2345         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2346         if (tmp) {
2347                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2348                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2349                         spin_unlock_irq(&mdev->al_lock);
2350                         return 0;
2351                 }
2352                 /* Do not slow down if app IO is already waiting for this extent */
2353         }
2354         spin_unlock_irq(&mdev->al_lock);
2355
2356         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2357                       (int)part_stat_read(&disk->part0, sectors[1]) -
2358                         atomic_read(&mdev->rs_sect_ev);
2359
2360         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2361                 unsigned long rs_left;
2362                 int i;
2363
2364                 mdev->rs_last_events = curr_events;
2365
2366                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2367                  * approx. */
2368                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2369
2370                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2371                         rs_left = mdev->ov_left;
2372                 else
2373                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2374
2375                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2376                 if (!dt)
2377                         dt++;
2378                 db = mdev->rs_mark_left[i] - rs_left;
2379                 dbdt = Bit2KB(db/dt);
2380
2381                 if (dbdt > c_min_rate)
2382                         throttle = 1;
2383         }
2384         return throttle;
2385 }
2386
2387
2388 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2389 {
2390         struct drbd_conf *mdev;
2391         sector_t sector;
2392         sector_t capacity;
2393         struct drbd_peer_request *peer_req;
2394         struct digest_info *di = NULL;
2395         int size, verb;
2396         unsigned int fault_type;
2397         struct p_block_req *p = pi->data;
2398
2399         mdev = vnr_to_mdev(tconn, pi->vnr);
2400         if (!mdev)
2401                 return -EIO;
2402         capacity = drbd_get_capacity(mdev->this_bdev);
2403
2404         sector = be64_to_cpu(p->sector);
2405         size   = be32_to_cpu(p->blksize);
2406
2407         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2408                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2409                                 (unsigned long long)sector, size);
2410                 return -EINVAL;
2411         }
2412         if (sector + (size>>9) > capacity) {
2413                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2414                                 (unsigned long long)sector, size);
2415                 return -EINVAL;
2416         }
2417
2418         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2419                 verb = 1;
2420                 switch (pi->cmd) {
2421                 case P_DATA_REQUEST:
2422                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2423                         break;
2424                 case P_RS_DATA_REQUEST:
2425                 case P_CSUM_RS_REQUEST:
2426                 case P_OV_REQUEST:
2427                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2428                         break;
2429                 case P_OV_REPLY:
2430                         verb = 0;
2431                         dec_rs_pending(mdev);
2432                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2433                         break;
2434                 default:
2435                         BUG();
2436                 }
2437                 if (verb && __ratelimit(&drbd_ratelimit_state))
2438                         dev_err(DEV, "Can not satisfy peer's read request, "
2439                             "no local data.\n");
2440
2441                 /* drain possibly payload */
2442                 return drbd_drain_block(mdev, pi->size);
2443         }
2444
2445         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2446          * "criss-cross" setup, that might cause write-out on some other DRBD,
2447          * which in turn might block on the other node at this very place.  */
2448         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2449         if (!peer_req) {
2450                 put_ldev(mdev);
2451                 return -ENOMEM;
2452         }
2453
2454         switch (pi->cmd) {
2455         case P_DATA_REQUEST:
2456                 peer_req->w.cb = w_e_end_data_req;
2457                 fault_type = DRBD_FAULT_DT_RD;
2458                 /* application IO, don't drbd_rs_begin_io */
2459                 goto submit;
2460
2461         case P_RS_DATA_REQUEST:
2462                 peer_req->w.cb = w_e_end_rsdata_req;
2463                 fault_type = DRBD_FAULT_RS_RD;
2464                 /* used in the sector offset progress display */
2465                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2466                 break;
2467
2468         case P_OV_REPLY:
2469         case P_CSUM_RS_REQUEST:
2470                 fault_type = DRBD_FAULT_RS_RD;
2471                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2472                 if (!di)
2473                         goto out_free_e;
2474
2475                 di->digest_size = pi->size;
2476                 di->digest = (((char *)di)+sizeof(struct digest_info));
2477
2478                 peer_req->digest = di;
2479                 peer_req->flags |= EE_HAS_DIGEST;
2480
2481                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2482                         goto out_free_e;
2483
2484                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2485                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2486                         peer_req->w.cb = w_e_end_csum_rs_req;
2487                         /* used in the sector offset progress display */
2488                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2489                 } else if (pi->cmd == P_OV_REPLY) {
2490                         /* track progress, we may need to throttle */
2491                         atomic_add(size >> 9, &mdev->rs_sect_in);
2492                         peer_req->w.cb = w_e_end_ov_reply;
2493                         dec_rs_pending(mdev);
2494                         /* drbd_rs_begin_io done when we sent this request,
2495                          * but accounting still needs to be done. */
2496                         goto submit_for_resync;
2497                 }
2498                 break;
2499
2500         case P_OV_REQUEST:
2501                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2502                     mdev->tconn->agreed_pro_version >= 90) {
2503                         unsigned long now = jiffies;
2504                         int i;
2505                         mdev->ov_start_sector = sector;
2506                         mdev->ov_position = sector;
2507                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2508                         mdev->rs_total = mdev->ov_left;
2509                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2510                                 mdev->rs_mark_left[i] = mdev->ov_left;
2511                                 mdev->rs_mark_time[i] = now;
2512                         }
2513                         dev_info(DEV, "Online Verify start sector: %llu\n",
2514                                         (unsigned long long)sector);
2515                 }
2516                 peer_req->w.cb = w_e_end_ov_req;
2517                 fault_type = DRBD_FAULT_RS_RD;
2518                 break;
2519
2520         default:
2521                 BUG();
2522         }
2523
2524         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2525          * wrt the receiver, but it is not as straightforward as it may seem.
2526          * Various places in the resync start and stop logic assume resync
2527          * requests are processed in order, requeuing this on the worker thread
2528          * introduces a bunch of new code for synchronization between threads.
2529          *
2530          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2531          * "forever", throttling after drbd_rs_begin_io will lock that extent
2532          * for application writes for the same time.  For now, just throttle
2533          * here, where the rest of the code expects the receiver to sleep for
2534          * a while, anyways.
2535          */
2536
2537         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2538          * this defers syncer requests for some time, before letting at least
2539          * on request through.  The resync controller on the receiving side
2540          * will adapt to the incoming rate accordingly.
2541          *
2542          * We cannot throttle here if remote is Primary/SyncTarget:
2543          * we would also throttle its application reads.
2544          * In that case, throttling is done on the SyncTarget only.
2545          */
2546         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2547                 schedule_timeout_uninterruptible(HZ/10);
2548         if (drbd_rs_begin_io(mdev, sector))
2549                 goto out_free_e;
2550
2551 submit_for_resync:
2552         atomic_add(size >> 9, &mdev->rs_sect_ev);
2553
2554 submit:
2555         inc_unacked(mdev);
2556         spin_lock_irq(&mdev->tconn->req_lock);
2557         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2558         spin_unlock_irq(&mdev->tconn->req_lock);
2559
2560         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2561                 return 0;
2562
2563         /* don't care for the reason here */
2564         dev_err(DEV, "submit failed, triggering re-connect\n");
2565         spin_lock_irq(&mdev->tconn->req_lock);
2566         list_del(&peer_req->w.list);
2567         spin_unlock_irq(&mdev->tconn->req_lock);
2568         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2569
2570 out_free_e:
2571         put_ldev(mdev);
2572         drbd_free_peer_req(mdev, peer_req);
2573         return -EIO;
2574 }
2575
2576 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2577 {
2578         int self, peer, rv = -100;
2579         unsigned long ch_self, ch_peer;
2580         enum drbd_after_sb_p after_sb_0p;
2581
2582         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2583         peer = mdev->p_uuid[UI_BITMAP] & 1;
2584
2585         ch_peer = mdev->p_uuid[UI_SIZE];
2586         ch_self = mdev->comm_bm_set;
2587
2588         rcu_read_lock();
2589         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2590         rcu_read_unlock();
2591         switch (after_sb_0p) {
2592         case ASB_CONSENSUS:
2593         case ASB_DISCARD_SECONDARY:
2594         case ASB_CALL_HELPER:
2595         case ASB_VIOLENTLY:
2596                 dev_err(DEV, "Configuration error.\n");
2597                 break;
2598         case ASB_DISCONNECT:
2599                 break;
2600         case ASB_DISCARD_YOUNGER_PRI:
2601                 if (self == 0 && peer == 1) {
2602                         rv = -1;
2603                         break;
2604                 }
2605                 if (self == 1 && peer == 0) {
2606                         rv =  1;
2607                         break;
2608                 }
2609                 /* Else fall through to one of the other strategies... */
2610         case ASB_DISCARD_OLDER_PRI:
2611                 if (self == 0 && peer == 1) {
2612                         rv = 1;
2613                         break;
2614                 }
2615                 if (self == 1 && peer == 0) {
2616                         rv = -1;
2617                         break;
2618                 }
2619                 /* Else fall through to one of the other strategies... */
2620                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2621                      "Using discard-least-changes instead\n");
2622         case ASB_DISCARD_ZERO_CHG:
2623                 if (ch_peer == 0 && ch_self == 0) {
2624                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2625                                 ? -1 : 1;
2626                         break;
2627                 } else {
2628                         if (ch_peer == 0) { rv =  1; break; }
2629                         if (ch_self == 0) { rv = -1; break; }
2630                 }
2631                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2632                         break;
2633         case ASB_DISCARD_LEAST_CHG:
2634                 if      (ch_self < ch_peer)
2635                         rv = -1;
2636                 else if (ch_self > ch_peer)
2637                         rv =  1;
2638                 else /* ( ch_self == ch_peer ) */
2639                      /* Well, then use something else. */
2640                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2641                                 ? -1 : 1;
2642                 break;
2643         case ASB_DISCARD_LOCAL:
2644                 rv = -1;
2645                 break;
2646         case ASB_DISCARD_REMOTE:
2647                 rv =  1;
2648         }
2649
2650         return rv;
2651 }
2652
2653 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2654 {
2655         int hg, rv = -100;
2656         enum drbd_after_sb_p after_sb_1p;
2657
2658         rcu_read_lock();
2659         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2660         rcu_read_unlock();
2661         switch (after_sb_1p) {
2662         case ASB_DISCARD_YOUNGER_PRI:
2663         case ASB_DISCARD_OLDER_PRI:
2664         case ASB_DISCARD_LEAST_CHG:
2665         case ASB_DISCARD_LOCAL:
2666         case ASB_DISCARD_REMOTE:
2667         case ASB_DISCARD_ZERO_CHG:
2668                 dev_err(DEV, "Configuration error.\n");
2669                 break;
2670         case ASB_DISCONNECT:
2671                 break;
2672         case ASB_CONSENSUS:
2673                 hg = drbd_asb_recover_0p(mdev);
2674                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2675                         rv = hg;
2676                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2677                         rv = hg;
2678                 break;
2679         case ASB_VIOLENTLY:
2680                 rv = drbd_asb_recover_0p(mdev);
2681                 break;
2682         case ASB_DISCARD_SECONDARY:
2683                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2684         case ASB_CALL_HELPER:
2685                 hg = drbd_asb_recover_0p(mdev);
2686                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2687                         enum drbd_state_rv rv2;
2688
2689                         drbd_set_role(mdev, R_SECONDARY, 0);
2690                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2691                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2692                           * we do not need to wait for the after state change work either. */
2693                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2694                         if (rv2 != SS_SUCCESS) {
2695                                 drbd_khelper(mdev, "pri-lost-after-sb");
2696                         } else {
2697                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2698                                 rv = hg;
2699                         }
2700                 } else
2701                         rv = hg;
2702         }
2703
2704         return rv;
2705 }
2706
2707 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2708 {
2709         int hg, rv = -100;
2710         enum drbd_after_sb_p after_sb_2p;
2711
2712         rcu_read_lock();
2713         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2714         rcu_read_unlock();
2715         switch (after_sb_2p) {
2716         case ASB_DISCARD_YOUNGER_PRI:
2717         case ASB_DISCARD_OLDER_PRI:
2718         case ASB_DISCARD_LEAST_CHG:
2719         case ASB_DISCARD_LOCAL:
2720         case ASB_DISCARD_REMOTE:
2721         case ASB_CONSENSUS:
2722         case ASB_DISCARD_SECONDARY:
2723         case ASB_DISCARD_ZERO_CHG:
2724                 dev_err(DEV, "Configuration error.\n");
2725                 break;
2726         case ASB_VIOLENTLY:
2727                 rv = drbd_asb_recover_0p(mdev);
2728                 break;
2729         case ASB_DISCONNECT:
2730                 break;
2731         case ASB_CALL_HELPER:
2732                 hg = drbd_asb_recover_0p(mdev);
2733                 if (hg == -1) {
2734                         enum drbd_state_rv rv2;
2735
2736                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2737                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2738                           * we do not need to wait for the after state change work either. */
2739                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2740                         if (rv2 != SS_SUCCESS) {
2741                                 drbd_khelper(mdev, "pri-lost-after-sb");
2742                         } else {
2743                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2744                                 rv = hg;
2745                         }
2746                 } else
2747                         rv = hg;
2748         }
2749
2750         return rv;
2751 }
2752
2753 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2754                            u64 bits, u64 flags)
2755 {
2756         if (!uuid) {
2757                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2758                 return;
2759         }
2760         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2761              text,
2762              (unsigned long long)uuid[UI_CURRENT],
2763              (unsigned long long)uuid[UI_BITMAP],
2764              (unsigned long long)uuid[UI_HISTORY_START],
2765              (unsigned long long)uuid[UI_HISTORY_END],
2766              (unsigned long long)bits,
2767              (unsigned long long)flags);
2768 }
2769
2770 /*
2771   100   after split brain try auto recover
2772     2   C_SYNC_SOURCE set BitMap
2773     1   C_SYNC_SOURCE use BitMap
2774     0   no Sync
2775    -1   C_SYNC_TARGET use BitMap
2776    -2   C_SYNC_TARGET set BitMap
2777  -100   after split brain, disconnect
2778 -1000   unrelated data
2779 -1091   requires proto 91
2780 -1096   requires proto 96
2781  */
2782 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2783 {
2784         u64 self, peer;
2785         int i, j;
2786
2787         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2788         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2789
2790         *rule_nr = 10;
2791         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2792                 return 0;
2793
2794         *rule_nr = 20;
2795         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2796              peer != UUID_JUST_CREATED)
2797                 return -2;
2798
2799         *rule_nr = 30;
2800         if (self != UUID_JUST_CREATED &&
2801             (peer == UUID_JUST_CREATED || peer == (u64)0))
2802                 return 2;
2803
2804         if (self == peer) {
2805                 int rct, dc; /* roles at crash time */
2806
2807                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2808
2809                         if (mdev->tconn->agreed_pro_version < 91)
2810                                 return -1091;
2811
2812                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2813                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2814                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2815                                 drbd_uuid_move_history(mdev);
2816                                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2817                                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2818
2819                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2820                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2821                                 *rule_nr = 34;
2822                         } else {
2823                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2824                                 *rule_nr = 36;
2825                         }
2826
2827                         return 1;
2828                 }
2829
2830                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2831
2832                         if (mdev->tconn->agreed_pro_version < 91)
2833                                 return -1091;
2834
2835                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2836                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2837                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2838
2839                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2840                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2841                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2842
2843                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2844                                 *rule_nr = 35;
2845                         } else {
2846                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2847                                 *rule_nr = 37;
2848                         }
2849
2850                         return -1;
2851                 }
2852
2853                 /* Common power [off|failure] */
2854                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2855                         (mdev->p_uuid[UI_FLAGS] & 2);
2856                 /* lowest bit is set when we were primary,
2857                  * next bit (weight 2) is set when peer was primary */
2858                 *rule_nr = 40;
2859
2860                 switch (rct) {
2861                 case 0: /* !self_pri && !peer_pri */ return 0;
2862                 case 1: /*  self_pri && !peer_pri */ return 1;
2863                 case 2: /* !self_pri &&  peer_pri */ return -1;
2864                 case 3: /*  self_pri &&  peer_pri */
2865                         dc = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags);
2866                         return dc ? -1 : 1;
2867                 }
2868         }
2869
2870         *rule_nr = 50;
2871         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2872         if (self == peer)
2873                 return -1;
2874
2875         *rule_nr = 51;
2876         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2877         if (self == peer) {
2878                 if (mdev->tconn->agreed_pro_version < 96 ?
2879                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2880                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2881                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2882                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2883                            resync as sync source modifications of the peer's UUIDs. */
2884
2885                         if (mdev->tconn->agreed_pro_version < 91)
2886                                 return -1091;
2887
2888                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2889                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2890
2891                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2892                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2893
2894                         return -1;
2895                 }
2896         }
2897
2898         *rule_nr = 60;
2899         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2900         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2901                 peer = mdev->p_uuid[i] & ~((u64)1);
2902                 if (self == peer)
2903                         return -2;
2904         }
2905
2906         *rule_nr = 70;
2907         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2908         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2909         if (self == peer)
2910                 return 1;
2911
2912         *rule_nr = 71;
2913         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2914         if (self == peer) {
2915                 if (mdev->tconn->agreed_pro_version < 96 ?
2916                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2917                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2918                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2919                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2920                            resync as sync source modifications of our UUIDs. */
2921
2922                         if (mdev->tconn->agreed_pro_version < 91)
2923                                 return -1091;
2924
2925                         __drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2926                         __drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2927
2928                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2929                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2930                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2931
2932                         return 1;
2933                 }
2934         }
2935
2936
2937         *rule_nr = 80;
2938         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2939         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2940                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2941                 if (self == peer)
2942                         return 2;
2943         }
2944
2945         *rule_nr = 90;
2946         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2947         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2948         if (self == peer && self != ((u64)0))
2949                 return 100;
2950
2951         *rule_nr = 100;
2952         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2953                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2954                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2955                         peer = mdev->p_uuid[j] & ~((u64)1);
2956                         if (self == peer)
2957                                 return -100;
2958                 }
2959         }
2960
2961         return -1000;
2962 }
2963
2964 /* drbd_sync_handshake() returns the new conn state on success, or
2965    CONN_MASK (-1) on failure.
2966  */
2967 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2968                                            enum drbd_disk_state peer_disk) __must_hold(local)
2969 {
2970         enum drbd_conns rv = C_MASK;
2971         enum drbd_disk_state mydisk;
2972         struct net_conf *nc;
2973         int hg, rule_nr, rr_conflict, tentative;
2974
2975         mydisk = mdev->state.disk;
2976         if (mydisk == D_NEGOTIATING)
2977                 mydisk = mdev->new_state_tmp.disk;
2978
2979         dev_info(DEV, "drbd_sync_handshake:\n");
2980
2981         spin_lock_irq(&mdev->ldev->md.uuid_lock);
2982         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2983         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2984                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2985
2986         hg = drbd_uuid_compare(mdev, &rule_nr);
2987         spin_unlock_irq(&mdev->ldev->md.uuid_lock);
2988
2989         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2990
2991         if (hg == -1000) {
2992                 dev_alert(DEV, "Unrelated data, aborting!\n");
2993                 return C_MASK;
2994         }
2995         if (hg < -1000) {
2996                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2997                 return C_MASK;
2998         }
2999
3000         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3001             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3002                 int f = (hg == -100) || abs(hg) == 2;
3003                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3004                 if (f)
3005                         hg = hg*2;
3006                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
3007                      hg > 0 ? "source" : "target");
3008         }
3009
3010         if (abs(hg) == 100)
3011                 drbd_khelper(mdev, "initial-split-brain");
3012
3013         rcu_read_lock();
3014         nc = rcu_dereference(mdev->tconn->net_conf);
3015
3016         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3017                 int pcount = (mdev->state.role == R_PRIMARY)
3018                            + (peer_role == R_PRIMARY);
3019                 int forced = (hg == -100);
3020
3021                 switch (pcount) {
3022                 case 0:
3023                         hg = drbd_asb_recover_0p(mdev);
3024                         break;
3025                 case 1:
3026                         hg = drbd_asb_recover_1p(mdev);
3027                         break;
3028                 case 2:
3029                         hg = drbd_asb_recover_2p(mdev);
3030                         break;
3031                 }
3032                 if (abs(hg) < 100) {
3033                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
3034                              "automatically solved. Sync from %s node\n",
3035                              pcount, (hg < 0) ? "peer" : "this");
3036                         if (forced) {
3037                                 dev_warn(DEV, "Doing a full sync, since"
3038                                      " UUIDs where ambiguous.\n");
3039                                 hg = hg*2;
3040                         }
3041                 }
3042         }
3043
3044         if (hg == -100) {
3045                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3046                         hg = -1;
3047                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3048                         hg = 1;
3049
3050                 if (abs(hg) < 100)
3051                         dev_warn(DEV, "Split-Brain detected, manually solved. "
3052                              "Sync from %s node\n",
3053                              (hg < 0) ? "peer" : "this");
3054         }
3055         rr_conflict = nc->rr_conflict;
3056         tentative = nc->tentative;
3057         rcu_read_unlock();
3058
3059         if (hg == -100) {
3060                 /* FIXME this log message is not correct if we end up here
3061                  * after an attempted attach on a diskless node.
3062                  * We just refuse to attach -- well, we drop the "connection"
3063                  * to that disk, in a way... */
3064                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3065                 drbd_khelper(mdev, "split-brain");
3066                 return C_MASK;
3067         }
3068
3069         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3070                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3071                 return C_MASK;
3072         }
3073
3074         if (hg < 0 && /* by intention we do not use mydisk here. */
3075             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3076                 switch (rr_conflict) {
3077                 case ASB_CALL_HELPER:
3078                         drbd_khelper(mdev, "pri-lost");
3079                         /* fall through */
3080                 case ASB_DISCONNECT:
3081                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3082                         return C_MASK;
3083                 case ASB_VIOLENTLY:
3084                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3085                              "assumption\n");
3086                 }
3087         }
3088
3089         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3090                 if (hg == 0)
3091                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3092                 else
3093                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3094                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3095                                  abs(hg) >= 2 ? "full" : "bit-map based");
3096                 return C_MASK;
3097         }
3098
3099         if (abs(hg) >= 2) {
3100                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3101                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3102                                         BM_LOCKED_SET_ALLOWED))
3103                         return C_MASK;
3104         }
3105
3106         if (hg > 0) { /* become sync source. */
3107                 rv = C_WF_BITMAP_S;
3108         } else if (hg < 0) { /* become sync target */
3109                 rv = C_WF_BITMAP_T;
3110         } else {
3111                 rv = C_CONNECTED;
3112                 if (drbd_bm_total_weight(mdev)) {
3113                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3114                              drbd_bm_total_weight(mdev));
3115                 }
3116         }
3117
3118         return rv;
3119 }
3120
3121 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3122 {
3123         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3124         if (peer == ASB_DISCARD_REMOTE)
3125                 return ASB_DISCARD_LOCAL;
3126
3127         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3128         if (peer == ASB_DISCARD_LOCAL)
3129                 return ASB_DISCARD_REMOTE;
3130
3131         /* everything else is valid if they are equal on both sides. */
3132         return peer;
3133 }
3134
3135 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3136 {
3137         struct p_protocol *p = pi->data;
3138         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3139         int p_proto, p_discard_my_data, p_two_primaries, cf;
3140         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3141         char integrity_alg[SHARED_SECRET_MAX] = "";
3142         struct crypto_hash *peer_integrity_tfm = NULL;
3143         void *int_dig_in = NULL, *int_dig_vv = NULL;
3144
3145         p_proto         = be32_to_cpu(p->protocol);
3146         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3147         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3148         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3149         p_two_primaries = be32_to_cpu(p->two_primaries);
3150         cf              = be32_to_cpu(p->conn_flags);
3151         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3152
3153         if (tconn->agreed_pro_version >= 87) {
3154                 int err;
3155
3156                 if (pi->size > sizeof(integrity_alg))
3157                         return -EIO;
3158                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3159                 if (err)
3160                         return err;
3161                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3162         }
3163
3164         if (pi->cmd != P_PROTOCOL_UPDATE) {
3165                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3166
3167                 if (cf & CF_DRY_RUN)
3168                         set_bit(CONN_DRY_RUN, &tconn->flags);
3169
3170                 rcu_read_lock();
3171                 nc = rcu_dereference(tconn->net_conf);
3172
3173                 if (p_proto != nc->wire_protocol) {
3174                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3175                         goto disconnect_rcu_unlock;
3176                 }
3177
3178                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3179                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3180                         goto disconnect_rcu_unlock;
3181                 }
3182
3183                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3184                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3185                         goto disconnect_rcu_unlock;
3186                 }
3187
3188                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3189                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3190                         goto disconnect_rcu_unlock;
3191                 }
3192
3193                 if (p_discard_my_data && nc->discard_my_data) {
3194                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3195                         goto disconnect_rcu_unlock;
3196                 }
3197
3198                 if (p_two_primaries != nc->two_primaries) {
3199                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3200                         goto disconnect_rcu_unlock;
3201                 }
3202
3203                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3204                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3205                         goto disconnect_rcu_unlock;
3206                 }
3207
3208                 rcu_read_unlock();
3209         }
3210
3211         if (integrity_alg[0]) {
3212                 int hash_size;
3213
3214                 /*
3215                  * We can only change the peer data integrity algorithm
3216                  * here.  Changing our own data integrity algorithm
3217                  * requires that we send a P_PROTOCOL_UPDATE packet at
3218                  * the same time; otherwise, the peer has no way to
3219                  * tell between which packets the algorithm should
3220                  * change.
3221                  */
3222
3223                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3224                 if (!peer_integrity_tfm) {
3225                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3226                                  integrity_alg);
3227                         goto disconnect;
3228                 }
3229
3230                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3231                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3232                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3233                 if (!(int_dig_in && int_dig_vv)) {
3234                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3235                         goto disconnect;
3236                 }
3237         }
3238
3239         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3240         if (!new_net_conf) {
3241                 conn_err(tconn, "Allocation of new net_conf failed\n");
3242                 goto disconnect;
3243         }
3244
3245         mutex_lock(&tconn->data.mutex);
3246         mutex_lock(&tconn->conf_update);
3247         old_net_conf = tconn->net_conf;
3248         *new_net_conf = *old_net_conf;
3249
3250         new_net_conf->wire_protocol = p_proto;
3251         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3252         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3253         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3254         new_net_conf->two_primaries = p_two_primaries;
3255
3256         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3257         mutex_unlock(&tconn->conf_update);
3258         mutex_unlock(&tconn->data.mutex);
3259
3260         crypto_free_hash(tconn->peer_integrity_tfm);
3261         kfree(tconn->int_dig_in);
3262         kfree(tconn->int_dig_vv);
3263         tconn->peer_integrity_tfm = peer_integrity_tfm;
3264         tconn->int_dig_in = int_dig_in;
3265         tconn->int_dig_vv = int_dig_vv;
3266
3267         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3268                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3269                           integrity_alg[0] ? integrity_alg : "(none)");
3270
3271         synchronize_rcu();
3272         kfree(old_net_conf);
3273         return 0;
3274
3275 disconnect_rcu_unlock:
3276         rcu_read_unlock();
3277 disconnect:
3278         crypto_free_hash(peer_integrity_tfm);
3279         kfree(int_dig_in);
3280         kfree(int_dig_vv);
3281         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3282         return -EIO;
3283 }
3284
3285 /* helper function
3286  * input: alg name, feature name
3287  * return: NULL (alg name was "")
3288  *         ERR_PTR(error) if something goes wrong
3289  *         or the crypto hash ptr, if it worked out ok. */
3290 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3291                 const char *alg, const char *name)
3292 {
3293         struct crypto_hash *tfm;
3294
3295         if (!alg[0])
3296                 return NULL;
3297
3298         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3299         if (IS_ERR(tfm)) {
3300                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3301                         alg, name, PTR_ERR(tfm));
3302                 return tfm;
3303         }
3304         return tfm;
3305 }
3306
3307 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3308 {
3309         void *buffer = tconn->data.rbuf;
3310         int size = pi->size;
3311
3312         while (size) {
3313                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3314                 s = drbd_recv(tconn, buffer, s);
3315                 if (s <= 0) {
3316                         if (s < 0)
3317                                 return s;
3318                         break;
3319                 }
3320                 size -= s;
3321         }
3322         if (size)
3323                 return -EIO;
3324         return 0;
3325 }
3326
3327 /*
3328  * config_unknown_volume  -  device configuration command for unknown volume
3329  *
3330  * When a device is added to an existing connection, the node on which the
3331  * device is added first will send configuration commands to its peer but the
3332  * peer will not know about the device yet.  It will warn and ignore these
3333  * commands.  Once the device is added on the second node, the second node will
3334  * send the same device configuration commands, but in the other direction.
3335  *
3336  * (We can also end up here if drbd is misconfigured.)
3337  */
3338 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3339 {
3340         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3341                   cmdname(pi->cmd), pi->vnr);
3342         return ignore_remaining_packet(tconn, pi);
3343 }
3344
3345 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3346 {
3347         struct drbd_conf *mdev;
3348         struct p_rs_param_95 *p;
3349         unsigned int header_size, data_size, exp_max_sz;
3350         struct crypto_hash *verify_tfm = NULL;
3351         struct crypto_hash *csums_tfm = NULL;
3352         struct net_conf *old_net_conf, *new_net_conf = NULL;
3353         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3354         const int apv = tconn->agreed_pro_version;
3355         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3356         int fifo_size = 0;
3357         int err;
3358
3359         mdev = vnr_to_mdev(tconn, pi->vnr);
3360         if (!mdev)
3361                 return config_unknown_volume(tconn, pi);
3362
3363         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3364                     : apv == 88 ? sizeof(struct p_rs_param)
3365                                         + SHARED_SECRET_MAX
3366                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3367                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3368
3369         if (pi->size > exp_max_sz) {
3370                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3371                     pi->size, exp_max_sz);
3372                 return -EIO;
3373         }
3374
3375         if (apv <= 88) {
3376                 header_size = sizeof(struct p_rs_param);
3377                 data_size = pi->size - header_size;
3378         } else if (apv <= 94) {
3379                 header_size = sizeof(struct p_rs_param_89);
3380                 data_size = pi->size - header_size;
3381                 D_ASSERT(data_size == 0);
3382         } else {
3383                 header_size = sizeof(struct p_rs_param_95);
3384                 data_size = pi->size - header_size;
3385                 D_ASSERT(data_size == 0);
3386         }
3387
3388         /* initialize verify_alg and csums_alg */
3389         p = pi->data;
3390         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3391
3392         err = drbd_recv_all(mdev->tconn, p, header_size);
3393         if (err)
3394                 return err;
3395
3396         mutex_lock(&mdev->tconn->conf_update);
3397         old_net_conf = mdev->tconn->net_conf;
3398         if (get_ldev(mdev)) {
3399                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3400                 if (!new_disk_conf) {
3401                         put_ldev(mdev);
3402                         mutex_unlock(&mdev->tconn->conf_update);
3403                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3404                         return -ENOMEM;
3405                 }
3406
3407                 old_disk_conf = mdev->ldev->disk_conf;
3408                 *new_disk_conf = *old_disk_conf;
3409
3410                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3411         }
3412
3413         if (apv >= 88) {
3414                 if (apv == 88) {
3415                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3416                                 dev_err(DEV, "verify-alg of wrong size, "
3417                                         "peer wants %u, accepting only up to %u byte\n",
3418                                         data_size, SHARED_SECRET_MAX);
3419                                 err = -EIO;
3420                                 goto reconnect;
3421                         }
3422
3423                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3424                         if (err)
3425                                 goto reconnect;
3426                         /* we expect NUL terminated string */
3427                         /* but just in case someone tries to be evil */
3428                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3429                         p->verify_alg[data_size-1] = 0;
3430
3431                 } else /* apv >= 89 */ {
3432                         /* we still expect NUL terminated strings */
3433                         /* but just in case someone tries to be evil */
3434                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3435                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3436                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3437                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3438                 }
3439
3440                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3441                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3442                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3443                                     old_net_conf->verify_alg, p->verify_alg);
3444                                 goto disconnect;
3445                         }
3446                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3447                                         p->verify_alg, "verify-alg");
3448                         if (IS_ERR(verify_tfm)) {
3449                                 verify_tfm = NULL;
3450                                 goto disconnect;
3451                         }
3452                 }
3453
3454                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3455                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3456                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3457                                     old_net_conf->csums_alg, p->csums_alg);
3458                                 goto disconnect;
3459                         }
3460                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3461                                         p->csums_alg, "csums-alg");
3462                         if (IS_ERR(csums_tfm)) {
3463                                 csums_tfm = NULL;
3464                                 goto disconnect;
3465                         }
3466                 }
3467
3468                 if (apv > 94 && new_disk_conf) {
3469                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3470                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3471                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3472                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3473
3474                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3475                         if (fifo_size != mdev->rs_plan_s->size) {
3476                                 new_plan = fifo_alloc(fifo_size);
3477                                 if (!new_plan) {
3478                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3479                                         put_ldev(mdev);
3480                                         goto disconnect;
3481                                 }
3482                         }
3483                 }
3484
3485                 if (verify_tfm || csums_tfm) {
3486                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3487                         if (!new_net_conf) {
3488                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3489                                 goto disconnect;
3490                         }
3491
3492                         *new_net_conf = *old_net_conf;
3493
3494                         if (verify_tfm) {
3495                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3496                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3497                                 crypto_free_hash(mdev->tconn->verify_tfm);
3498                                 mdev->tconn->verify_tfm = verify_tfm;
3499                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3500                         }
3501                         if (csums_tfm) {
3502                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3503                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3504                                 crypto_free_hash(mdev->tconn->csums_tfm);
3505                                 mdev->tconn->csums_tfm = csums_tfm;
3506                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3507                         }
3508                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3509                 }
3510         }
3511
3512         if (new_disk_conf) {
3513                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3514                 put_ldev(mdev);
3515         }
3516
3517         if (new_plan) {
3518                 old_plan = mdev->rs_plan_s;
3519                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3520         }
3521
3522         mutex_unlock(&mdev->tconn->conf_update);
3523         synchronize_rcu();
3524         if (new_net_conf)
3525                 kfree(old_net_conf);
3526         kfree(old_disk_conf);
3527         kfree(old_plan);
3528
3529         return 0;
3530
3531 reconnect:
3532         if (new_disk_conf) {
3533                 put_ldev(mdev);
3534                 kfree(new_disk_conf);
3535         }
3536         mutex_unlock(&mdev->tconn->conf_update);
3537         return -EIO;
3538
3539 disconnect:
3540         kfree(new_plan);
3541         if (new_disk_conf) {
3542                 put_ldev(mdev);
3543                 kfree(new_disk_conf);
3544         }
3545         mutex_unlock(&mdev->tconn->conf_update);
3546         /* just for completeness: actually not needed,
3547          * as this is not reached if csums_tfm was ok. */
3548         crypto_free_hash(csums_tfm);
3549         /* but free the verify_tfm again, if csums_tfm did not work out */
3550         crypto_free_hash(verify_tfm);
3551         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3552         return -EIO;
3553 }
3554
3555 /* warn if the arguments differ by more than 12.5% */
3556 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3557         const char *s, sector_t a, sector_t b)
3558 {
3559         sector_t d;
3560         if (a == 0 || b == 0)
3561                 return;
3562         d = (a > b) ? (a - b) : (b - a);
3563         if (d > (a>>3) || d > (b>>3))
3564                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3565                      (unsigned long long)a, (unsigned long long)b);
3566 }
3567
3568 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3569 {
3570         struct drbd_conf *mdev;
3571         struct p_sizes *p = pi->data;
3572         enum determine_dev_size dd = unchanged;
3573         sector_t p_size, p_usize, my_usize;
3574         int ldsc = 0; /* local disk size changed */
3575         enum dds_flags ddsf;
3576
3577         mdev = vnr_to_mdev(tconn, pi->vnr);
3578         if (!mdev)
3579                 return config_unknown_volume(tconn, pi);
3580
3581         p_size = be64_to_cpu(p->d_size);
3582         p_usize = be64_to_cpu(p->u_size);
3583
3584         /* just store the peer's disk size for now.
3585          * we still need to figure out whether we accept that. */
3586         mdev->p_size = p_size;
3587
3588         if (get_ldev(mdev)) {
3589                 rcu_read_lock();
3590                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3591                 rcu_read_unlock();
3592
3593                 warn_if_differ_considerably(mdev, "lower level device sizes",
3594                            p_size, drbd_get_max_capacity(mdev->ldev));
3595                 warn_if_differ_considerably(mdev, "user requested size",
3596                                             p_usize, my_usize);
3597
3598                 /* if this is the first connect, or an otherwise expected
3599                  * param exchange, choose the minimum */
3600                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3601                         p_usize = min_not_zero(my_usize, p_usize);
3602
3603                 /* Never shrink a device with usable data during connect.
3604                    But allow online shrinking if we are connected. */
3605                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3606                     drbd_get_capacity(mdev->this_bdev) &&
3607                     mdev->state.disk >= D_OUTDATED &&
3608                     mdev->state.conn < C_CONNECTED) {
3609                         dev_err(DEV, "The peer's disk size is too small!\n");
3610                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3611                         put_ldev(mdev);
3612                         return -EIO;
3613                 }
3614
3615                 if (my_usize != p_usize) {
3616                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3617
3618                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3619                         if (!new_disk_conf) {
3620                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3621                                 put_ldev(mdev);
3622                                 return -ENOMEM;
3623                         }
3624
3625                         mutex_lock(&mdev->tconn->conf_update);
3626                         old_disk_conf = mdev->ldev->disk_conf;
3627                         *new_disk_conf = *old_disk_conf;
3628                         new_disk_conf->disk_size = p_usize;
3629
3630                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3631                         mutex_unlock(&mdev->tconn->conf_update);
3632                         synchronize_rcu();
3633                         kfree(old_disk_conf);
3634
3635                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3636                                  (unsigned long)my_usize);
3637                 }
3638
3639                 put_ldev(mdev);
3640         }
3641
3642         ddsf = be16_to_cpu(p->dds_flags);
3643         if (get_ldev(mdev)) {
3644                 dd = drbd_determine_dev_size(mdev, ddsf);
3645                 put_ldev(mdev);
3646                 if (dd == dev_size_error)
3647                         return -EIO;
3648                 drbd_md_sync(mdev);
3649         } else {
3650                 /* I am diskless, need to accept the peer's size. */
3651                 drbd_set_my_capacity(mdev, p_size);
3652         }
3653
3654         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3655         drbd_reconsider_max_bio_size(mdev);
3656
3657         if (get_ldev(mdev)) {
3658                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3659                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3660                         ldsc = 1;
3661                 }
3662
3663                 put_ldev(mdev);
3664         }
3665
3666         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3667                 if (be64_to_cpu(p->c_size) !=
3668                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3669                         /* we have different sizes, probably peer
3670                          * needs to know my new size... */
3671                         drbd_send_sizes(mdev, 0, ddsf);
3672                 }
3673                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3674                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3675                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3676                             mdev->state.disk >= D_INCONSISTENT) {
3677                                 if (ddsf & DDSF_NO_RESYNC)
3678                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3679                                 else
3680                                         resync_after_online_grow(mdev);
3681                         } else
3682                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3683                 }
3684         }
3685
3686         return 0;
3687 }
3688
3689 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3690 {
3691         struct drbd_conf *mdev;
3692         struct p_uuids *p = pi->data;
3693         u64 *p_uuid;
3694         int i, updated_uuids = 0;
3695
3696         mdev = vnr_to_mdev(tconn, pi->vnr);
3697         if (!mdev)
3698                 return config_unknown_volume(tconn, pi);
3699
3700         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3701
3702         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3703                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3704
3705         kfree(mdev->p_uuid);
3706         mdev->p_uuid = p_uuid;
3707
3708         if (mdev->state.conn < C_CONNECTED &&
3709             mdev->state.disk < D_INCONSISTENT &&
3710             mdev->state.role == R_PRIMARY &&
3711             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3712                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3713                     (unsigned long long)mdev->ed_uuid);
3714                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3715                 return -EIO;
3716         }
3717
3718         if (get_ldev(mdev)) {
3719                 int skip_initial_sync =
3720                         mdev->state.conn == C_CONNECTED &&
3721                         mdev->tconn->agreed_pro_version >= 90 &&
3722                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3723                         (p_uuid[UI_FLAGS] & 8);
3724                 if (skip_initial_sync) {
3725                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3726                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3727                                         "clear_n_write from receive_uuids",
3728                                         BM_LOCKED_TEST_ALLOWED);
3729                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3730                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3731                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3732                                         CS_VERBOSE, NULL);
3733                         drbd_md_sync(mdev);
3734                         updated_uuids = 1;
3735                 }
3736                 put_ldev(mdev);
3737         } else if (mdev->state.disk < D_INCONSISTENT &&
3738                    mdev->state.role == R_PRIMARY) {
3739                 /* I am a diskless primary, the peer just created a new current UUID
3740                    for me. */
3741                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3742         }
3743
3744         /* Before we test for the disk state, we should wait until an eventually
3745            ongoing cluster wide state change is finished. That is important if
3746            we are primary and are detaching from our disk. We need to see the
3747            new disk state... */
3748         mutex_lock(mdev->state_mutex);
3749         mutex_unlock(mdev->state_mutex);
3750         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3751                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3752
3753         if (updated_uuids)
3754                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3755
3756         return 0;
3757 }
3758
3759 /**
3760  * convert_state() - Converts the peer's view of the cluster state to our point of view
3761  * @ps:         The state as seen by the peer.
3762  */
3763 static union drbd_state convert_state(union drbd_state ps)
3764 {
3765         union drbd_state ms;
3766
3767         static enum drbd_conns c_tab[] = {
3768                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3769                 [C_CONNECTED] = C_CONNECTED,
3770
3771                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3772                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3773                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3774                 [C_VERIFY_S]       = C_VERIFY_T,
3775                 [C_MASK]   = C_MASK,
3776         };
3777
3778         ms.i = ps.i;
3779
3780         ms.conn = c_tab[ps.conn];
3781         ms.peer = ps.role;
3782         ms.role = ps.peer;
3783         ms.pdsk = ps.disk;
3784         ms.disk = ps.pdsk;
3785         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3786
3787         return ms;
3788 }
3789
3790 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3791 {
3792         struct drbd_conf *mdev;
3793         struct p_req_state *p = pi->data;
3794         union drbd_state mask, val;
3795         enum drbd_state_rv rv;
3796
3797         mdev = vnr_to_mdev(tconn, pi->vnr);
3798         if (!mdev)
3799                 return -EIO;
3800
3801         mask.i = be32_to_cpu(p->mask);
3802         val.i = be32_to_cpu(p->val);
3803
3804         if (test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags) &&
3805             mutex_is_locked(mdev->state_mutex)) {
3806                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3807                 return 0;
3808         }
3809
3810         mask = convert_state(mask);
3811         val = convert_state(val);
3812
3813         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3814         drbd_send_sr_reply(mdev, rv);
3815
3816         drbd_md_sync(mdev);
3817
3818         return 0;
3819 }
3820
3821 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3822 {
3823         struct p_req_state *p = pi->data;
3824         union drbd_state mask, val;
3825         enum drbd_state_rv rv;
3826
3827         mask.i = be32_to_cpu(p->mask);
3828         val.i = be32_to_cpu(p->val);
3829
3830         if (test_bit(RESOLVE_CONFLICTS, &tconn->flags) &&
3831             mutex_is_locked(&tconn->cstate_mutex)) {
3832                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3833                 return 0;
3834         }
3835
3836         mask = convert_state(mask);
3837         val = convert_state(val);
3838
3839         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3840         conn_send_sr_reply(tconn, rv);
3841
3842         return 0;
3843 }
3844
3845 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3846 {
3847         struct drbd_conf *mdev;
3848         struct p_state *p = pi->data;
3849         union drbd_state os, ns, peer_state;
3850         enum drbd_disk_state real_peer_disk;
3851         enum chg_state_flags cs_flags;
3852         int rv;
3853
3854         mdev = vnr_to_mdev(tconn, pi->vnr);
3855         if (!mdev)
3856                 return config_unknown_volume(tconn, pi);
3857
3858         peer_state.i = be32_to_cpu(p->state);
3859
3860         real_peer_disk = peer_state.disk;
3861         if (peer_state.disk == D_NEGOTIATING) {
3862                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3863                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3864         }
3865
3866         spin_lock_irq(&mdev->tconn->req_lock);
3867  retry:
3868         os = ns = drbd_read_state(mdev);
3869         spin_unlock_irq(&mdev->tconn->req_lock);
3870
3871         /* If some other part of the code (asender thread, timeout)
3872          * already decided to close the connection again,
3873          * we must not "re-establish" it here. */
3874         if (os.conn <= C_TEAR_DOWN)
3875                 return -ECONNRESET;
3876
3877         /* If this is the "end of sync" confirmation, usually the peer disk
3878          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3879          * set) resync started in PausedSyncT, or if the timing of pause-/
3880          * unpause-sync events has been "just right", the peer disk may
3881          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3882          */
3883         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3884             real_peer_disk == D_UP_TO_DATE &&
3885             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3886                 /* If we are (becoming) SyncSource, but peer is still in sync
3887                  * preparation, ignore its uptodate-ness to avoid flapping, it
3888                  * will change to inconsistent once the peer reaches active
3889                  * syncing states.
3890                  * It may have changed syncer-paused flags, however, so we
3891                  * cannot ignore this completely. */
3892                 if (peer_state.conn > C_CONNECTED &&
3893                     peer_state.conn < C_SYNC_SOURCE)
3894                         real_peer_disk = D_INCONSISTENT;
3895
3896                 /* if peer_state changes to connected at the same time,
3897                  * it explicitly notifies us that it finished resync.
3898                  * Maybe we should finish it up, too? */
3899                 else if (os.conn >= C_SYNC_SOURCE &&
3900                          peer_state.conn == C_CONNECTED) {
3901                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3902                                 drbd_resync_finished(mdev);
3903                         return 0;
3904                 }
3905         }
3906
3907         /* explicit verify finished notification, stop sector reached. */
3908         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3909             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3910                 ov_out_of_sync_print(mdev);
3911                 drbd_resync_finished(mdev);
3912                 return 0;
3913         }
3914
3915         /* peer says his disk is inconsistent, while we think it is uptodate,
3916          * and this happens while the peer still thinks we have a sync going on,
3917          * but we think we are already done with the sync.
3918          * We ignore this to avoid flapping pdsk.
3919          * This should not happen, if the peer is a recent version of drbd. */
3920         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3921             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3922                 real_peer_disk = D_UP_TO_DATE;
3923
3924         if (ns.conn == C_WF_REPORT_PARAMS)
3925                 ns.conn = C_CONNECTED;
3926
3927         if (peer_state.conn == C_AHEAD)
3928                 ns.conn = C_BEHIND;
3929
3930         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3931             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3932                 int cr; /* consider resync */
3933
3934                 /* if we established a new connection */
3935                 cr  = (os.conn < C_CONNECTED);
3936                 /* if we had an established connection
3937                  * and one of the nodes newly attaches a disk */
3938                 cr |= (os.conn == C_CONNECTED &&
3939                        (peer_state.disk == D_NEGOTIATING ||
3940                         os.disk == D_NEGOTIATING));
3941                 /* if we have both been inconsistent, and the peer has been
3942                  * forced to be UpToDate with --overwrite-data */
3943                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3944                 /* if we had been plain connected, and the admin requested to
3945                  * start a sync by "invalidate" or "invalidate-remote" */
3946                 cr |= (os.conn == C_CONNECTED &&
3947                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3948                                  peer_state.conn <= C_WF_BITMAP_T));
3949
3950                 if (cr)
3951                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3952
3953                 put_ldev(mdev);
3954                 if (ns.conn == C_MASK) {
3955                         ns.conn = C_CONNECTED;
3956                         if (mdev->state.disk == D_NEGOTIATING) {
3957                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3958                         } else if (peer_state.disk == D_NEGOTIATING) {
3959                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3960                                 peer_state.disk = D_DISKLESS;
3961                                 real_peer_disk = D_DISKLESS;
3962                         } else {
3963                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3964                                         return -EIO;
3965                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3966                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3967                                 return -EIO;
3968                         }
3969                 }
3970         }
3971
3972         spin_lock_irq(&mdev->tconn->req_lock);
3973         if (os.i != drbd_read_state(mdev).i)
3974                 goto retry;
3975         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3976         ns.peer = peer_state.role;
3977         ns.pdsk = real_peer_disk;
3978         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3979         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3980                 ns.disk = mdev->new_state_tmp.disk;
3981         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3982         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3983             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3984                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3985                    for temporal network outages! */
3986                 spin_unlock_irq(&mdev->tconn->req_lock);
3987                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3988                 tl_clear(mdev->tconn);
3989                 drbd_uuid_new_current(mdev);
3990                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3991                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3992                 return -EIO;
3993         }
3994         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3995         ns = drbd_read_state(mdev);
3996         spin_unlock_irq(&mdev->tconn->req_lock);
3997
3998         if (rv < SS_SUCCESS) {
3999                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4000                 return -EIO;
4001         }
4002
4003         if (os.conn > C_WF_REPORT_PARAMS) {
4004                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4005                     peer_state.disk != D_NEGOTIATING ) {
4006                         /* we want resync, peer has not yet decided to sync... */
4007                         /* Nowadays only used when forcing a node into primary role and
4008                            setting its disk to UpToDate with that */
4009                         drbd_send_uuids(mdev);
4010                         drbd_send_current_state(mdev);
4011                 }
4012         }
4013
4014         clear_bit(DISCARD_MY_DATA, &mdev->flags);
4015
4016         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
4017
4018         return 0;
4019 }
4020
4021 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
4022 {
4023         struct drbd_conf *mdev;
4024         struct p_rs_uuid *p = pi->data;
4025
4026         mdev = vnr_to_mdev(tconn, pi->vnr);
4027         if (!mdev)
4028                 return -EIO;
4029
4030         wait_event(mdev->misc_wait,
4031                    mdev->state.conn == C_WF_SYNC_UUID ||
4032                    mdev->state.conn == C_BEHIND ||
4033                    mdev->state.conn < C_CONNECTED ||
4034                    mdev->state.disk < D_NEGOTIATING);
4035
4036         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
4037
4038         /* Here the _drbd_uuid_ functions are right, current should
4039            _not_ be rotated into the history */
4040         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
4041                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
4042                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
4043
4044                 drbd_print_uuids(mdev, "updated sync uuid");
4045                 drbd_start_resync(mdev, C_SYNC_TARGET);
4046
4047                 put_ldev(mdev);
4048         } else
4049                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
4050
4051         return 0;
4052 }
4053
4054 /**
4055  * receive_bitmap_plain
4056  *
4057  * Return 0 when done, 1 when another iteration is needed, and a negative error
4058  * code upon failure.
4059  */
4060 static int
4061 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4062                      unsigned long *p, struct bm_xfer_ctx *c)
4063 {
4064         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4065                                  drbd_header_size(mdev->tconn);
4066         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4067                                        c->bm_words - c->word_offset);
4068         unsigned int want = num_words * sizeof(*p);
4069         int err;
4070
4071         if (want != size) {
4072                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4073                 return -EIO;
4074         }
4075         if (want == 0)
4076                 return 0;
4077         err = drbd_recv_all(mdev->tconn, p, want);
4078         if (err)
4079                 return err;
4080
4081         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4082
4083         c->word_offset += num_words;
4084         c->bit_offset = c->word_offset * BITS_PER_LONG;
4085         if (c->bit_offset > c->bm_bits)
4086                 c->bit_offset = c->bm_bits;
4087
4088         return 1;
4089 }
4090
4091 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4092 {
4093         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4094 }
4095
4096 static int dcbp_get_start(struct p_compressed_bm *p)
4097 {
4098         return (p->encoding & 0x80) != 0;
4099 }
4100
4101 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4102 {
4103         return (p->encoding >> 4) & 0x7;
4104 }
4105
4106 /**
4107  * recv_bm_rle_bits
4108  *
4109  * Return 0 when done, 1 when another iteration is needed, and a negative error
4110  * code upon failure.
4111  */
4112 static int
4113 recv_bm_rle_bits(struct drbd_conf *mdev,
4114                 struct p_compressed_bm *p,
4115                  struct bm_xfer_ctx *c,
4116                  unsigned int len)
4117 {
4118         struct bitstream bs;
4119         u64 look_ahead;
4120         u64 rl;
4121         u64 tmp;
4122         unsigned long s = c->bit_offset;
4123         unsigned long e;
4124         int toggle = dcbp_get_start(p);
4125         int have;
4126         int bits;
4127
4128         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4129
4130         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4131         if (bits < 0)
4132                 return -EIO;
4133
4134         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4135                 bits = vli_decode_bits(&rl, look_ahead);
4136                 if (bits <= 0)
4137                         return -EIO;
4138
4139                 if (toggle) {
4140                         e = s + rl -1;
4141                         if (e >= c->bm_bits) {
4142                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4143                                 return -EIO;
4144                         }
4145                         _drbd_bm_set_bits(mdev, s, e);
4146                 }
4147
4148                 if (have < bits) {
4149                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4150                                 have, bits, look_ahead,
4151                                 (unsigned int)(bs.cur.b - p->code),
4152                                 (unsigned int)bs.buf_len);
4153                         return -EIO;
4154                 }
4155                 look_ahead >>= bits;
4156                 have -= bits;
4157
4158                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4159                 if (bits < 0)
4160                         return -EIO;
4161                 look_ahead |= tmp << have;
4162                 have += bits;
4163         }
4164
4165         c->bit_offset = s;
4166         bm_xfer_ctx_bit_to_word_offset(c);
4167
4168         return (s != c->bm_bits);
4169 }
4170
4171 /**
4172  * decode_bitmap_c
4173  *
4174  * Return 0 when done, 1 when another iteration is needed, and a negative error
4175  * code upon failure.
4176  */
4177 static int
4178 decode_bitmap_c(struct drbd_conf *mdev,
4179                 struct p_compressed_bm *p,
4180                 struct bm_xfer_ctx *c,
4181                 unsigned int len)
4182 {
4183         if (dcbp_get_code(p) == RLE_VLI_Bits)
4184                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4185
4186         /* other variants had been implemented for evaluation,
4187          * but have been dropped as this one turned out to be "best"
4188          * during all our tests. */
4189
4190         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4191         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4192         return -EIO;
4193 }
4194
4195 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4196                 const char *direction, struct bm_xfer_ctx *c)
4197 {
4198         /* what would it take to transfer it "plaintext" */
4199         unsigned int header_size = drbd_header_size(mdev->tconn);
4200         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4201         unsigned int plain =
4202                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4203                 c->bm_words * sizeof(unsigned long);
4204         unsigned int total = c->bytes[0] + c->bytes[1];
4205         unsigned int r;
4206
4207         /* total can not be zero. but just in case: */
4208         if (total == 0)
4209                 return;
4210
4211         /* don't report if not compressed */
4212         if (total >= plain)
4213                 return;
4214
4215         /* total < plain. check for overflow, still */
4216         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4217                                     : (1000 * total / plain);
4218
4219         if (r > 1000)
4220                 r = 1000;
4221
4222         r = 1000 - r;
4223         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4224              "total %u; compression: %u.%u%%\n",
4225                         direction,
4226                         c->bytes[1], c->packets[1],
4227                         c->bytes[0], c->packets[0],
4228                         total, r/10, r % 10);
4229 }
4230
4231 /* Since we are processing the bitfield from lower addresses to higher,
4232    it does not matter if the process it in 32 bit chunks or 64 bit
4233    chunks as long as it is little endian. (Understand it as byte stream,
4234    beginning with the lowest byte...) If we would use big endian
4235    we would need to process it from the highest address to the lowest,
4236    in order to be agnostic to the 32 vs 64 bits issue.
4237
4238    returns 0 on failure, 1 if we successfully received it. */
4239 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4240 {
4241         struct drbd_conf *mdev;
4242         struct bm_xfer_ctx c;
4243         int err;
4244
4245         mdev = vnr_to_mdev(tconn, pi->vnr);
4246         if (!mdev)
4247                 return -EIO;
4248
4249         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4250         /* you are supposed to send additional out-of-sync information
4251          * if you actually set bits during this phase */
4252
4253         c = (struct bm_xfer_ctx) {
4254                 .bm_bits = drbd_bm_bits(mdev),
4255                 .bm_words = drbd_bm_words(mdev),
4256         };
4257
4258         for(;;) {
4259                 if (pi->cmd == P_BITMAP)
4260                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4261                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4262                         /* MAYBE: sanity check that we speak proto >= 90,
4263                          * and the feature is enabled! */
4264                         struct p_compressed_bm *p = pi->data;
4265
4266                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4267                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4268                                 err = -EIO;
4269                                 goto out;
4270                         }
4271                         if (pi->size <= sizeof(*p)) {
4272                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4273                                 err = -EIO;
4274                                 goto out;
4275                         }
4276                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4277                         if (err)
4278                                goto out;
4279                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4280                 } else {
4281                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4282                         err = -EIO;
4283                         goto out;
4284                 }
4285
4286                 c.packets[pi->cmd == P_BITMAP]++;
4287                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4288
4289                 if (err <= 0) {
4290                         if (err < 0)
4291                                 goto out;
4292                         break;
4293                 }
4294                 err = drbd_recv_header(mdev->tconn, pi);
4295                 if (err)
4296                         goto out;
4297         }
4298
4299         INFO_bm_xfer_stats(mdev, "receive", &c);
4300
4301         if (mdev->state.conn == C_WF_BITMAP_T) {
4302                 enum drbd_state_rv rv;
4303
4304                 err = drbd_send_bitmap(mdev);
4305                 if (err)
4306                         goto out;
4307                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4308                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4309                 D_ASSERT(rv == SS_SUCCESS);
4310         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4311                 /* admin may have requested C_DISCONNECTING,
4312                  * other threads may have noticed network errors */
4313                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4314                     drbd_conn_str(mdev->state.conn));
4315         }
4316         err = 0;
4317
4318  out:
4319         drbd_bm_unlock(mdev);
4320         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4321                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4322         return err;
4323 }
4324
4325 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4326 {
4327         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4328                  pi->cmd, pi->size);
4329
4330         return ignore_remaining_packet(tconn, pi);
4331 }
4332
4333 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4334 {
4335         /* Make sure we've acked all the TCP data associated
4336          * with the data requests being unplugged */
4337         drbd_tcp_quickack(tconn->data.socket);
4338
4339         return 0;
4340 }
4341
4342 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4343 {
4344         struct drbd_conf *mdev;
4345         struct p_block_desc *p = pi->data;
4346
4347         mdev = vnr_to_mdev(tconn, pi->vnr);
4348         if (!mdev)
4349                 return -EIO;
4350
4351         switch (mdev->state.conn) {
4352         case C_WF_SYNC_UUID:
4353         case C_WF_BITMAP_T:
4354         case C_BEHIND:
4355                         break;
4356         default:
4357                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4358                                 drbd_conn_str(mdev->state.conn));
4359         }
4360
4361         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4362
4363         return 0;
4364 }
4365
4366 struct data_cmd {
4367         int expect_payload;
4368         size_t pkt_size;
4369         int (*fn)(struct drbd_tconn *, struct packet_info *);
4370 };
4371
4372 static struct data_cmd drbd_cmd_handler[] = {
4373         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4374         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4375         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4376         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4377         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4378         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4379         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4380         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4381         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4382         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4383         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4384         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4385         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4386         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4387         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4388         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4389         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4390         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4391         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4392         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4393         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4394         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4395         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4396         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4397 };
4398
4399 static void drbdd(struct drbd_tconn *tconn)
4400 {
4401         struct packet_info pi;
4402         size_t shs; /* sub header size */
4403         int err;
4404
4405         while (get_t_state(&tconn->receiver) == RUNNING) {
4406                 struct data_cmd *cmd;
4407
4408                 drbd_thread_current_set_cpu(&tconn->receiver);
4409                 if (drbd_recv_header(tconn, &pi))
4410                         goto err_out;
4411
4412                 cmd = &drbd_cmd_handler[pi.cmd];
4413                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4414                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4415                                  cmdname(pi.cmd), pi.cmd);
4416                         goto err_out;
4417                 }
4418
4419                 shs = cmd->pkt_size;
4420                 if (pi.size > shs && !cmd->expect_payload) {
4421                         conn_err(tconn, "No payload expected %s l:%d\n",
4422                                  cmdname(pi.cmd), pi.size);
4423                         goto err_out;
4424                 }
4425
4426                 if (shs) {
4427                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4428                         if (err)
4429                                 goto err_out;
4430                         pi.size -= shs;
4431                 }
4432
4433                 err = cmd->fn(tconn, &pi);
4434                 if (err) {
4435                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4436                                  cmdname(pi.cmd), err, pi.size);
4437                         goto err_out;
4438                 }
4439         }
4440         return;
4441
4442     err_out:
4443         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4444 }
4445
4446 void conn_flush_workqueue(struct drbd_tconn *tconn)
4447 {
4448         struct drbd_wq_barrier barr;
4449
4450         barr.w.cb = w_prev_work_done;
4451         barr.w.tconn = tconn;
4452         init_completion(&barr.done);
4453         drbd_queue_work(&tconn->sender_work, &barr.w);
4454         wait_for_completion(&barr.done);
4455 }
4456
4457 static void conn_disconnect(struct drbd_tconn *tconn)
4458 {
4459         struct drbd_conf *mdev;
4460         enum drbd_conns oc;
4461         int vnr;
4462
4463         if (tconn->cstate == C_STANDALONE)
4464                 return;
4465
4466         /* We are about to start the cleanup after connection loss.
4467          * Make sure drbd_make_request knows about that.
4468          * Usually we should be in some network failure state already,
4469          * but just in case we are not, we fix it up here.
4470          */
4471         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4472
4473         /* asender does not clean up anything. it must not interfere, either */
4474         drbd_thread_stop(&tconn->asender);
4475         drbd_free_sock(tconn);
4476
4477         rcu_read_lock();
4478         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4479                 kref_get(&mdev->kref);
4480                 rcu_read_unlock();
4481                 drbd_disconnected(mdev);
4482                 kref_put(&mdev->kref, &drbd_minor_destroy);
4483                 rcu_read_lock();
4484         }
4485         rcu_read_unlock();
4486
4487         if (!list_empty(&tconn->current_epoch->list))
4488                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4489         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4490         atomic_set(&tconn->current_epoch->epoch_size, 0);
4491         tconn->send.seen_any_write_yet = false;
4492
4493         conn_info(tconn, "Connection closed\n");
4494
4495         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4496                 conn_try_outdate_peer_async(tconn);
4497
4498         spin_lock_irq(&tconn->req_lock);
4499         oc = tconn->cstate;
4500         if (oc >= C_UNCONNECTED)
4501                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4502
4503         spin_unlock_irq(&tconn->req_lock);
4504
4505         if (oc == C_DISCONNECTING)
4506                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4507 }
4508
4509 static int drbd_disconnected(struct drbd_conf *mdev)
4510 {
4511         unsigned int i;
4512
4513         /* wait for current activity to cease. */
4514         spin_lock_irq(&mdev->tconn->req_lock);
4515         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4516         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4517         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4518         spin_unlock_irq(&mdev->tconn->req_lock);
4519
4520         /* We do not have data structures that would allow us to
4521          * get the rs_pending_cnt down to 0 again.
4522          *  * On C_SYNC_TARGET we do not have any data structures describing
4523          *    the pending RSDataRequest's we have sent.
4524          *  * On C_SYNC_SOURCE there is no data structure that tracks
4525          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4526          *  And no, it is not the sum of the reference counts in the
4527          *  resync_LRU. The resync_LRU tracks the whole operation including
4528          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4529          *  on the fly. */
4530         drbd_rs_cancel_all(mdev);
4531         mdev->rs_total = 0;
4532         mdev->rs_failed = 0;
4533         atomic_set(&mdev->rs_pending_cnt, 0);
4534         wake_up(&mdev->misc_wait);
4535
4536         del_timer_sync(&mdev->resync_timer);
4537         resync_timer_fn((unsigned long)mdev);
4538
4539         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4540          * w_make_resync_request etc. which may still be on the worker queue
4541          * to be "canceled" */
4542         drbd_flush_workqueue(mdev);
4543
4544         drbd_finish_peer_reqs(mdev);
4545
4546         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4547            might have issued a work again. The one before drbd_finish_peer_reqs() is
4548            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4549         drbd_flush_workqueue(mdev);
4550
4551         kfree(mdev->p_uuid);
4552         mdev->p_uuid = NULL;
4553
4554         if (!drbd_suspended(mdev))
4555                 tl_clear(mdev->tconn);
4556
4557         drbd_md_sync(mdev);
4558
4559         /* serialize with bitmap writeout triggered by the state change,
4560          * if any. */
4561         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4562
4563         /* tcp_close and release of sendpage pages can be deferred.  I don't
4564          * want to use SO_LINGER, because apparently it can be deferred for
4565          * more than 20 seconds (longest time I checked).
4566          *
4567          * Actually we don't care for exactly when the network stack does its
4568          * put_page(), but release our reference on these pages right here.
4569          */
4570         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4571         if (i)
4572                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4573         i = atomic_read(&mdev->pp_in_use_by_net);
4574         if (i)
4575                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4576         i = atomic_read(&mdev->pp_in_use);
4577         if (i)
4578                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4579
4580         D_ASSERT(list_empty(&mdev->read_ee));
4581         D_ASSERT(list_empty(&mdev->active_ee));
4582         D_ASSERT(list_empty(&mdev->sync_ee));
4583         D_ASSERT(list_empty(&mdev->done_ee));
4584
4585         return 0;
4586 }
4587
4588 /*
4589  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4590  * we can agree on is stored in agreed_pro_version.
4591  *
4592  * feature flags and the reserved array should be enough room for future
4593  * enhancements of the handshake protocol, and possible plugins...
4594  *
4595  * for now, they are expected to be zero, but ignored.
4596  */
4597 static int drbd_send_features(struct drbd_tconn *tconn)
4598 {
4599         struct drbd_socket *sock;
4600         struct p_connection_features *p;
4601
4602         sock = &tconn->data;
4603         p = conn_prepare_command(tconn, sock);
4604         if (!p)
4605                 return -EIO;
4606         memset(p, 0, sizeof(*p));
4607         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4608         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4609         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4610 }
4611
4612 /*
4613  * return values:
4614  *   1 yes, we have a valid connection
4615  *   0 oops, did not work out, please try again
4616  *  -1 peer talks different language,
4617  *     no point in trying again, please go standalone.
4618  */
4619 static int drbd_do_features(struct drbd_tconn *tconn)
4620 {
4621         /* ASSERT current == tconn->receiver ... */
4622         struct p_connection_features *p;
4623         const int expect = sizeof(struct p_connection_features);
4624         struct packet_info pi;
4625         int err;
4626
4627         err = drbd_send_features(tconn);
4628         if (err)
4629                 return 0;
4630
4631         err = drbd_recv_header(tconn, &pi);
4632         if (err)
4633                 return 0;
4634
4635         if (pi.cmd != P_CONNECTION_FEATURES) {
4636                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4637                          cmdname(pi.cmd), pi.cmd);
4638                 return -1;
4639         }
4640
4641         if (pi.size != expect) {
4642                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4643                      expect, pi.size);
4644                 return -1;
4645         }
4646
4647         p = pi.data;
4648         err = drbd_recv_all_warn(tconn, p, expect);
4649         if (err)
4650                 return 0;
4651
4652         p->protocol_min = be32_to_cpu(p->protocol_min);
4653         p->protocol_max = be32_to_cpu(p->protocol_max);
4654         if (p->protocol_max == 0)
4655                 p->protocol_max = p->protocol_min;
4656
4657         if (PRO_VERSION_MAX < p->protocol_min ||
4658             PRO_VERSION_MIN > p->protocol_max)
4659                 goto incompat;
4660
4661         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4662
4663         conn_info(tconn, "Handshake successful: "
4664              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4665
4666         return 1;
4667
4668  incompat:
4669         conn_err(tconn, "incompatible DRBD dialects: "
4670             "I support %d-%d, peer supports %d-%d\n",
4671             PRO_VERSION_MIN, PRO_VERSION_MAX,
4672             p->protocol_min, p->protocol_max);
4673         return -1;
4674 }
4675
4676 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4677 static int drbd_do_auth(struct drbd_tconn *tconn)
4678 {
4679         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4680         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4681         return -1;
4682 }
4683 #else
4684 #define CHALLENGE_LEN 64
4685
4686 /* Return value:
4687         1 - auth succeeded,
4688         0 - failed, try again (network error),
4689         -1 - auth failed, don't try again.
4690 */
4691
4692 static int drbd_do_auth(struct drbd_tconn *tconn)
4693 {
4694         struct drbd_socket *sock;
4695         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4696         struct scatterlist sg;
4697         char *response = NULL;
4698         char *right_response = NULL;
4699         char *peers_ch = NULL;
4700         unsigned int key_len;
4701         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4702         unsigned int resp_size;
4703         struct hash_desc desc;
4704         struct packet_info pi;
4705         struct net_conf *nc;
4706         int err, rv;
4707
4708         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4709
4710         rcu_read_lock();
4711         nc = rcu_dereference(tconn->net_conf);
4712         key_len = strlen(nc->shared_secret);
4713         memcpy(secret, nc->shared_secret, key_len);
4714         rcu_read_unlock();
4715
4716         desc.tfm = tconn->cram_hmac_tfm;
4717         desc.flags = 0;
4718
4719         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4720         if (rv) {
4721                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4722                 rv = -1;
4723                 goto fail;
4724         }
4725
4726         get_random_bytes(my_challenge, CHALLENGE_LEN);
4727
4728         sock = &tconn->data;
4729         if (!conn_prepare_command(tconn, sock)) {
4730                 rv = 0;
4731                 goto fail;
4732         }
4733         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4734                                 my_challenge, CHALLENGE_LEN);
4735         if (!rv)
4736                 goto fail;
4737
4738         err = drbd_recv_header(tconn, &pi);
4739         if (err) {
4740                 rv = 0;
4741                 goto fail;
4742         }
4743
4744         if (pi.cmd != P_AUTH_CHALLENGE) {
4745                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4746                          cmdname(pi.cmd), pi.cmd);
4747                 rv = 0;
4748                 goto fail;
4749         }
4750
4751         if (pi.size > CHALLENGE_LEN * 2) {
4752                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4753                 rv = -1;
4754                 goto fail;
4755         }
4756
4757         peers_ch = kmalloc(pi.size, GFP_NOIO);
4758         if (peers_ch == NULL) {
4759                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4760                 rv = -1;
4761                 goto fail;
4762         }
4763
4764         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4765         if (err) {
4766                 rv = 0;
4767                 goto fail;
4768         }
4769
4770         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4771         response = kmalloc(resp_size, GFP_NOIO);
4772         if (response == NULL) {
4773                 conn_err(tconn, "kmalloc of response failed\n");
4774                 rv = -1;
4775                 goto fail;
4776         }
4777
4778         sg_init_table(&sg, 1);
4779         sg_set_buf(&sg, peers_ch, pi.size);
4780
4781         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4782         if (rv) {
4783                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4784                 rv = -1;
4785                 goto fail;
4786         }
4787
4788         if (!conn_prepare_command(tconn, sock)) {
4789                 rv = 0;
4790                 goto fail;
4791         }
4792         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4793                                 response, resp_size);
4794         if (!rv)
4795                 goto fail;
4796
4797         err = drbd_recv_header(tconn, &pi);
4798         if (err) {
4799                 rv = 0;
4800                 goto fail;
4801         }
4802
4803         if (pi.cmd != P_AUTH_RESPONSE) {
4804                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4805                          cmdname(pi.cmd), pi.cmd);
4806                 rv = 0;
4807                 goto fail;
4808         }
4809
4810         if (pi.size != resp_size) {
4811                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4812                 rv = 0;
4813                 goto fail;
4814         }
4815
4816         err = drbd_recv_all_warn(tconn, response , resp_size);
4817         if (err) {
4818                 rv = 0;
4819                 goto fail;
4820         }
4821
4822         right_response = kmalloc(resp_size, GFP_NOIO);
4823         if (right_response == NULL) {
4824                 conn_err(tconn, "kmalloc of right_response failed\n");
4825                 rv = -1;
4826                 goto fail;
4827         }
4828
4829         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4830
4831         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4832         if (rv) {
4833                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4834                 rv = -1;
4835                 goto fail;
4836         }
4837
4838         rv = !memcmp(response, right_response, resp_size);
4839
4840         if (rv)
4841                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4842                      resp_size);
4843         else
4844                 rv = -1;
4845
4846  fail:
4847         kfree(peers_ch);
4848         kfree(response);
4849         kfree(right_response);
4850
4851         return rv;
4852 }
4853 #endif
4854
4855 int drbdd_init(struct drbd_thread *thi)
4856 {
4857         struct drbd_tconn *tconn = thi->tconn;
4858         int h;
4859
4860         conn_info(tconn, "receiver (re)started\n");
4861
4862         do {
4863                 h = conn_connect(tconn);
4864                 if (h == 0) {
4865                         conn_disconnect(tconn);
4866                         schedule_timeout_interruptible(HZ);
4867                 }
4868                 if (h == -1) {
4869                         conn_warn(tconn, "Discarding network configuration.\n");
4870                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4871                 }
4872         } while (h == 0);
4873
4874         if (h > 0)
4875                 drbdd(tconn);
4876
4877         conn_disconnect(tconn);
4878
4879         conn_info(tconn, "receiver terminated\n");
4880         return 0;
4881 }
4882
4883 /* ********* acknowledge sender ******** */
4884
4885 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4886 {
4887         struct p_req_state_reply *p = pi->data;
4888         int retcode = be32_to_cpu(p->retcode);
4889
4890         if (retcode >= SS_SUCCESS) {
4891                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4892         } else {
4893                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4894                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4895                          drbd_set_st_err_str(retcode), retcode);
4896         }
4897         wake_up(&tconn->ping_wait);
4898
4899         return 0;
4900 }
4901
4902 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4903 {
4904         struct drbd_conf *mdev;
4905         struct p_req_state_reply *p = pi->data;
4906         int retcode = be32_to_cpu(p->retcode);
4907
4908         mdev = vnr_to_mdev(tconn, pi->vnr);
4909         if (!mdev)
4910                 return -EIO;
4911
4912         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4913                 D_ASSERT(tconn->agreed_pro_version < 100);
4914                 return got_conn_RqSReply(tconn, pi);
4915         }
4916
4917         if (retcode >= SS_SUCCESS) {
4918                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4919         } else {
4920                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4921                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4922                         drbd_set_st_err_str(retcode), retcode);
4923         }
4924         wake_up(&mdev->state_wait);
4925
4926         return 0;
4927 }
4928
4929 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4930 {
4931         return drbd_send_ping_ack(tconn);
4932
4933 }
4934
4935 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4936 {
4937         /* restore idle timeout */
4938         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4939         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4940                 wake_up(&tconn->ping_wait);
4941
4942         return 0;
4943 }
4944
4945 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4946 {
4947         struct drbd_conf *mdev;
4948         struct p_block_ack *p = pi->data;
4949         sector_t sector = be64_to_cpu(p->sector);
4950         int blksize = be32_to_cpu(p->blksize);
4951
4952         mdev = vnr_to_mdev(tconn, pi->vnr);
4953         if (!mdev)
4954                 return -EIO;
4955
4956         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4957
4958         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4959
4960         if (get_ldev(mdev)) {
4961                 drbd_rs_complete_io(mdev, sector);
4962                 drbd_set_in_sync(mdev, sector, blksize);
4963                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4964                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4965                 put_ldev(mdev);
4966         }
4967         dec_rs_pending(mdev);
4968         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4969
4970         return 0;
4971 }
4972
4973 static int
4974 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4975                               struct rb_root *root, const char *func,
4976                               enum drbd_req_event what, bool missing_ok)
4977 {
4978         struct drbd_request *req;
4979         struct bio_and_error m;
4980
4981         spin_lock_irq(&mdev->tconn->req_lock);
4982         req = find_request(mdev, root, id, sector, missing_ok, func);
4983         if (unlikely(!req)) {
4984                 spin_unlock_irq(&mdev->tconn->req_lock);
4985                 return -EIO;
4986         }
4987         __req_mod(req, what, &m);
4988         spin_unlock_irq(&mdev->tconn->req_lock);
4989
4990         if (m.bio)
4991                 complete_master_bio(mdev, &m);
4992         return 0;
4993 }
4994
4995 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4996 {
4997         struct drbd_conf *mdev;
4998         struct p_block_ack *p = pi->data;
4999         sector_t sector = be64_to_cpu(p->sector);
5000         int blksize = be32_to_cpu(p->blksize);
5001         enum drbd_req_event what;
5002
5003         mdev = vnr_to_mdev(tconn, pi->vnr);
5004         if (!mdev)
5005                 return -EIO;
5006
5007         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5008
5009         if (p->block_id == ID_SYNCER) {
5010                 drbd_set_in_sync(mdev, sector, blksize);
5011                 dec_rs_pending(mdev);
5012                 return 0;
5013         }
5014         switch (pi->cmd) {
5015         case P_RS_WRITE_ACK:
5016                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5017                 break;
5018         case P_WRITE_ACK:
5019                 what = WRITE_ACKED_BY_PEER;
5020                 break;
5021         case P_RECV_ACK:
5022                 what = RECV_ACKED_BY_PEER;
5023                 break;
5024         case P_SUPERSEDED:
5025                 what = CONFLICT_RESOLVED;
5026                 break;
5027         case P_RETRY_WRITE:
5028                 what = POSTPONE_WRITE;
5029                 break;
5030         default:
5031                 BUG();
5032         }
5033
5034         return validate_req_change_req_state(mdev, p->block_id, sector,
5035                                              &mdev->write_requests, __func__,
5036                                              what, false);
5037 }
5038
5039 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
5040 {
5041         struct drbd_conf *mdev;
5042         struct p_block_ack *p = pi->data;
5043         sector_t sector = be64_to_cpu(p->sector);
5044         int size = be32_to_cpu(p->blksize);
5045         int err;
5046
5047         mdev = vnr_to_mdev(tconn, pi->vnr);
5048         if (!mdev)
5049                 return -EIO;
5050
5051         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5052
5053         if (p->block_id == ID_SYNCER) {
5054                 dec_rs_pending(mdev);
5055                 drbd_rs_failed_io(mdev, sector, size);
5056                 return 0;
5057         }
5058
5059         err = validate_req_change_req_state(mdev, p->block_id, sector,
5060                                             &mdev->write_requests, __func__,
5061                                             NEG_ACKED, true);
5062         if (err) {
5063                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5064                    The master bio might already be completed, therefore the
5065                    request is no longer in the collision hash. */
5066                 /* In Protocol B we might already have got a P_RECV_ACK
5067                    but then get a P_NEG_ACK afterwards. */
5068                 drbd_set_out_of_sync(mdev, sector, size);
5069         }
5070         return 0;
5071 }
5072
5073 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5074 {
5075         struct drbd_conf *mdev;
5076         struct p_block_ack *p = pi->data;
5077         sector_t sector = be64_to_cpu(p->sector);
5078
5079         mdev = vnr_to_mdev(tconn, pi->vnr);
5080         if (!mdev)
5081                 return -EIO;
5082
5083         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5084
5085         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5086             (unsigned long long)sector, be32_to_cpu(p->blksize));
5087
5088         return validate_req_change_req_state(mdev, p->block_id, sector,
5089                                              &mdev->read_requests, __func__,
5090                                              NEG_ACKED, false);
5091 }
5092
5093 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5094 {
5095         struct drbd_conf *mdev;
5096         sector_t sector;
5097         int size;
5098         struct p_block_ack *p = pi->data;
5099
5100         mdev = vnr_to_mdev(tconn, pi->vnr);
5101         if (!mdev)
5102                 return -EIO;
5103
5104         sector = be64_to_cpu(p->sector);
5105         size = be32_to_cpu(p->blksize);
5106
5107         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5108
5109         dec_rs_pending(mdev);
5110
5111         if (get_ldev_if_state(mdev, D_FAILED)) {
5112                 drbd_rs_complete_io(mdev, sector);
5113                 switch (pi->cmd) {
5114                 case P_NEG_RS_DREPLY:
5115                         drbd_rs_failed_io(mdev, sector, size);
5116                 case P_RS_CANCEL:
5117                         break;
5118                 default:
5119                         BUG();
5120                 }
5121                 put_ldev(mdev);
5122         }
5123
5124         return 0;
5125 }
5126
5127 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5128 {
5129         struct p_barrier_ack *p = pi->data;
5130         struct drbd_conf *mdev;
5131         int vnr;
5132
5133         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5134
5135         rcu_read_lock();
5136         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5137                 if (mdev->state.conn == C_AHEAD &&
5138                     atomic_read(&mdev->ap_in_flight) == 0 &&
5139                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5140                         mdev->start_resync_timer.expires = jiffies + HZ;
5141                         add_timer(&mdev->start_resync_timer);
5142                 }
5143         }
5144         rcu_read_unlock();
5145
5146         return 0;
5147 }
5148
5149 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5150 {
5151         struct drbd_conf *mdev;
5152         struct p_block_ack *p = pi->data;
5153         struct drbd_work *w;
5154         sector_t sector;
5155         int size;
5156
5157         mdev = vnr_to_mdev(tconn, pi->vnr);
5158         if (!mdev)
5159                 return -EIO;
5160
5161         sector = be64_to_cpu(p->sector);
5162         size = be32_to_cpu(p->blksize);
5163
5164         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5165
5166         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5167                 drbd_ov_out_of_sync_found(mdev, sector, size);
5168         else
5169                 ov_out_of_sync_print(mdev);
5170
5171         if (!get_ldev(mdev))
5172                 return 0;
5173
5174         drbd_rs_complete_io(mdev, sector);
5175         dec_rs_pending(mdev);
5176
5177         --mdev->ov_left;
5178
5179         /* let's advance progress step marks only for every other megabyte */
5180         if ((mdev->ov_left & 0x200) == 0x200)
5181                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5182
5183         if (mdev->ov_left == 0) {
5184                 w = kmalloc(sizeof(*w), GFP_NOIO);
5185                 if (w) {
5186                         w->cb = w_ov_finished;
5187                         w->mdev = mdev;
5188                         drbd_queue_work(&mdev->tconn->sender_work, w);
5189                 } else {
5190                         dev_err(DEV, "kmalloc(w) failed.");
5191                         ov_out_of_sync_print(mdev);
5192                         drbd_resync_finished(mdev);
5193                 }
5194         }
5195         put_ldev(mdev);
5196         return 0;
5197 }
5198
5199 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5200 {
5201         return 0;
5202 }
5203
5204 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5205 {
5206         struct drbd_conf *mdev;
5207         int vnr, not_empty = 0;
5208
5209         do {
5210                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5211                 flush_signals(current);
5212
5213                 rcu_read_lock();
5214                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5215                         kref_get(&mdev->kref);
5216                         rcu_read_unlock();
5217                         if (drbd_finish_peer_reqs(mdev)) {
5218                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5219                                 return 1;
5220                         }
5221                         kref_put(&mdev->kref, &drbd_minor_destroy);
5222                         rcu_read_lock();
5223                 }
5224                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5225
5226                 spin_lock_irq(&tconn->req_lock);
5227                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5228                         not_empty = !list_empty(&mdev->done_ee);
5229                         if (not_empty)
5230                                 break;
5231                 }
5232                 spin_unlock_irq(&tconn->req_lock);
5233                 rcu_read_unlock();
5234         } while (not_empty);
5235
5236         return 0;
5237 }
5238
5239 struct asender_cmd {
5240         size_t pkt_size;
5241         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5242 };
5243
5244 static struct asender_cmd asender_tbl[] = {
5245         [P_PING]            = { 0, got_Ping },
5246         [P_PING_ACK]        = { 0, got_PingAck },
5247         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5248         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5249         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5250         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5251         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5252         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5253         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5254         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5255         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5256         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5257         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5258         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5259         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5260         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5261         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5262 };
5263
5264 int drbd_asender(struct drbd_thread *thi)
5265 {
5266         struct drbd_tconn *tconn = thi->tconn;
5267         struct asender_cmd *cmd = NULL;
5268         struct packet_info pi;
5269         int rv;
5270         void *buf    = tconn->meta.rbuf;
5271         int received = 0;
5272         unsigned int header_size = drbd_header_size(tconn);
5273         int expect   = header_size;
5274         bool ping_timeout_active = false;
5275         struct net_conf *nc;
5276         int ping_timeo, tcp_cork, ping_int;
5277
5278         current->policy = SCHED_RR;  /* Make this a realtime task! */
5279         current->rt_priority = 2;    /* more important than all other tasks */
5280
5281         while (get_t_state(thi) == RUNNING) {
5282                 drbd_thread_current_set_cpu(thi);
5283
5284                 rcu_read_lock();
5285                 nc = rcu_dereference(tconn->net_conf);
5286                 ping_timeo = nc->ping_timeo;
5287                 tcp_cork = nc->tcp_cork;
5288                 ping_int = nc->ping_int;
5289                 rcu_read_unlock();
5290
5291                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5292                         if (drbd_send_ping(tconn)) {
5293                                 conn_err(tconn, "drbd_send_ping has failed\n");
5294                                 goto reconnect;
5295                         }
5296                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5297                         ping_timeout_active = true;
5298                 }
5299
5300                 /* TODO: conditionally cork; it may hurt latency if we cork without
5301                    much to send */
5302                 if (tcp_cork)
5303                         drbd_tcp_cork(tconn->meta.socket);
5304                 if (tconn_finish_peer_reqs(tconn)) {
5305                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5306                         goto reconnect;
5307                 }
5308                 /* but unconditionally uncork unless disabled */
5309                 if (tcp_cork)
5310                         drbd_tcp_uncork(tconn->meta.socket);
5311
5312                 /* short circuit, recv_msg would return EINTR anyways. */
5313                 if (signal_pending(current))
5314                         continue;
5315
5316                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5317                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5318
5319                 flush_signals(current);
5320
5321                 /* Note:
5322                  * -EINTR        (on meta) we got a signal
5323                  * -EAGAIN       (on meta) rcvtimeo expired
5324                  * -ECONNRESET   other side closed the connection
5325                  * -ERESTARTSYS  (on data) we got a signal
5326                  * rv <  0       other than above: unexpected error!
5327                  * rv == expected: full header or command
5328                  * rv <  expected: "woken" by signal during receive
5329                  * rv == 0       : "connection shut down by peer"
5330                  */
5331                 if (likely(rv > 0)) {
5332                         received += rv;
5333                         buf      += rv;
5334                 } else if (rv == 0) {
5335                         if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
5336                                 long t;
5337                                 rcu_read_lock();
5338                                 t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
5339                                 rcu_read_unlock();
5340
5341                                 t = wait_event_timeout(tconn->ping_wait,
5342                                                        tconn->cstate < C_WF_REPORT_PARAMS,
5343                                                        t);
5344                                 if (t)
5345                                         break;
5346                         }
5347                         conn_err(tconn, "meta connection shut down by peer.\n");
5348                         goto reconnect;
5349                 } else if (rv == -EAGAIN) {
5350                         /* If the data socket received something meanwhile,
5351                          * that is good enough: peer is still alive. */
5352                         if (time_after(tconn->last_received,
5353                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5354                                 continue;
5355                         if (ping_timeout_active) {
5356                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5357                                 goto reconnect;
5358                         }
5359                         set_bit(SEND_PING, &tconn->flags);
5360                         continue;
5361                 } else if (rv == -EINTR) {
5362                         continue;
5363                 } else {
5364                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5365                         goto reconnect;
5366                 }
5367
5368                 if (received == expect && cmd == NULL) {
5369                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5370                                 goto reconnect;
5371                         cmd = &asender_tbl[pi.cmd];
5372                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5373                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5374                                          cmdname(pi.cmd), pi.cmd);
5375                                 goto disconnect;
5376                         }
5377                         expect = header_size + cmd->pkt_size;
5378                         if (pi.size != expect - header_size) {
5379                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5380                                         pi.cmd, pi.size);
5381                                 goto reconnect;
5382                         }
5383                 }
5384                 if (received == expect) {
5385                         bool err;
5386
5387                         err = cmd->fn(tconn, &pi);
5388                         if (err) {
5389                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5390                                 goto reconnect;
5391                         }
5392
5393                         tconn->last_received = jiffies;
5394
5395                         if (cmd == &asender_tbl[P_PING_ACK]) {
5396                                 /* restore idle timeout */
5397                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5398                                 ping_timeout_active = false;
5399                         }
5400
5401                         buf      = tconn->meta.rbuf;
5402                         received = 0;
5403                         expect   = header_size;
5404                         cmd      = NULL;
5405                 }
5406         }
5407
5408         if (0) {
5409 reconnect:
5410                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5411         }
5412         if (0) {
5413 disconnect:
5414                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5415         }
5416         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5417
5418         conn_info(tconn, "asender terminated\n");
5419
5420         return 0;
5421 }