drbd: fix race when forcefully disconnecting
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, try_connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         try_connect_int = nc->try_connect_int;
633
634         my_addr_len = min_t(int, nc->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, nc->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)nc->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, nc->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, nc->peer_addr, peer_addr_len);
644
645         rcu_read_unlock();
646
647         what = "sock_create_kern";
648         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
649                                SOCK_STREAM, IPPROTO_TCP, &sock);
650         if (err < 0) {
651                 sock = NULL;
652                 goto out;
653         }
654
655         sock->sk->sk_rcvtimeo =
656         sock->sk->sk_sndtimeo = try_connect_int * HZ;
657         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
658
659        /* explicitly bind to the configured IP as source IP
660         *  for the outgoing connections.
661         *  This is needed for multihomed hosts and to be
662         *  able to use lo: interfaces for drbd.
663         * Make sure to use 0 as port number, so linux selects
664         *  a free one dynamically.
665         */
666         what = "bind before connect";
667         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
668         if (err < 0)
669                 goto out;
670
671         /* connect may fail, peer not yet available.
672          * stay C_WF_CONNECTION, don't go Disconnecting! */
673         disconnect_on_error = 0;
674         what = "connect";
675         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
676
677 out:
678         if (err < 0) {
679                 if (sock) {
680                         sock_release(sock);
681                         sock = NULL;
682                 }
683                 switch (-err) {
684                         /* timeout, busy, signal pending */
685                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
686                 case EINTR: case ERESTARTSYS:
687                         /* peer not (yet) available, network problem */
688                 case ECONNREFUSED: case ENETUNREACH:
689                 case EHOSTDOWN:    case EHOSTUNREACH:
690                         disconnect_on_error = 0;
691                         break;
692                 default:
693                         conn_err(tconn, "%s failed, err = %d\n", what, err);
694                 }
695                 if (disconnect_on_error)
696                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
697         }
698
699         return sock;
700 }
701
702 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
703 {
704         int timeo, err, my_addr_len;
705         int sndbuf_size, rcvbuf_size, try_connect_int;
706         struct socket *s_estab = NULL, *s_listen;
707         struct sockaddr_in6 my_addr;
708         struct net_conf *nc;
709         const char *what;
710
711         rcu_read_lock();
712         nc = rcu_dereference(tconn->net_conf);
713         if (!nc) {
714                 rcu_read_unlock();
715                 return NULL;
716         }
717
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         try_connect_int = nc->try_connect_int;
721
722         my_addr_len = min_t(int, nc->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, nc->my_addr, my_addr_len);
724         rcu_read_unlock();
725
726         what = "sock_create_kern";
727         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
728                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         timeo = try_connect_int * HZ;
735         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
736
737         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
738         s_listen->sk->sk_rcvtimeo = timeo;
739         s_listen->sk->sk_sndtimeo = timeo;
740         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
741
742         what = "bind before listen";
743         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
744         if (err < 0)
745                 goto out;
746
747         err = drbd_accept(&what, s_listen, &s_estab);
748
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         conn_err(tconn, "%s failed, err = %d\n", what, err);
755                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return s_estab;
760 }
761
762 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
763
764 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
765                              enum drbd_packet cmd)
766 {
767         if (!conn_prepare_command(tconn, sock))
768                 return -EIO;
769         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
770 }
771
772 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
773 {
774         unsigned int header_size = drbd_header_size(tconn);
775         struct packet_info pi;
776         int err;
777
778         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
779         if (err != header_size) {
780                 if (err >= 0)
781                         err = -EIO;
782                 return err;
783         }
784         err = decode_header(tconn, tconn->data.rbuf, &pi);
785         if (err)
786                 return err;
787         return pi.cmd;
788 }
789
790 /**
791  * drbd_socket_okay() - Free the socket if its connection is not okay
792  * @sock:       pointer to the pointer to the socket.
793  */
794 static int drbd_socket_okay(struct socket **sock)
795 {
796         int rr;
797         char tb[4];
798
799         if (!*sock)
800                 return false;
801
802         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
803
804         if (rr > 0 || rr == -EAGAIN) {
805                 return true;
806         } else {
807                 sock_release(*sock);
808                 *sock = NULL;
809                 return false;
810         }
811 }
812 /* Gets called if a connection is established, or if a new minor gets created
813    in a connection */
814 int drbd_connected(int vnr, void *p, void *data)
815 {
816         struct drbd_conf *mdev = (struct drbd_conf *)p;
817         int err;
818
819         atomic_set(&mdev->packet_seq, 0);
820         mdev->peer_seq = 0;
821
822         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
823                 &mdev->tconn->cstate_mutex :
824                 &mdev->own_state_mutex;
825
826         err = drbd_send_sync_param(mdev);
827         if (!err)
828                 err = drbd_send_sizes(mdev, 0, 0);
829         if (!err)
830                 err = drbd_send_uuids(mdev);
831         if (!err)
832                 err = drbd_send_state(mdev);
833         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
834         clear_bit(RESIZE_PENDING, &mdev->flags);
835         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
836         return err;
837 }
838
839 /*
840  * return values:
841  *   1 yes, we have a valid connection
842  *   0 oops, did not work out, please try again
843  *  -1 peer talks different language,
844  *     no point in trying again, please go standalone.
845  *  -2 We do not have a network config...
846  */
847 static int drbd_connect(struct drbd_tconn *tconn)
848 {
849         struct socket *sock, *msock;
850         struct net_conf *nc;
851         int timeout, try, h, ok;
852
853         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
854                 return -2;
855
856         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
857
858         /* Assume that the peer only understands protocol 80 until we know better.  */
859         tconn->agreed_pro_version = 80;
860
861         do {
862                 struct socket *s;
863
864                 for (try = 0;;) {
865                         /* 3 tries, this should take less than a second! */
866                         s = drbd_try_connect(tconn);
867                         if (s || ++try >= 3)
868                                 break;
869                         /* give the other side time to call bind() & listen() */
870                         schedule_timeout_interruptible(HZ / 10);
871                 }
872
873                 if (s) {
874                         if (!tconn->data.socket) {
875                                 tconn->data.socket = s;
876                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
877                         } else if (!tconn->meta.socket) {
878                                 tconn->meta.socket = s;
879                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
880                         } else {
881                                 conn_err(tconn, "Logic error in drbd_connect()\n");
882                                 goto out_release_sockets;
883                         }
884                 }
885
886                 if (tconn->data.socket && tconn->meta.socket) {
887                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
888                         ok = drbd_socket_okay(&tconn->data.socket);
889                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
890                         if (ok)
891                                 break;
892                 }
893
894 retry:
895                 s = drbd_wait_for_connect(tconn);
896                 if (s) {
897                         try = receive_first_packet(tconn, s);
898                         drbd_socket_okay(&tconn->data.socket);
899                         drbd_socket_okay(&tconn->meta.socket);
900                         switch (try) {
901                         case P_INITIAL_DATA:
902                                 if (tconn->data.socket) {
903                                         conn_warn(tconn, "initial packet S crossed\n");
904                                         sock_release(tconn->data.socket);
905                                 }
906                                 tconn->data.socket = s;
907                                 break;
908                         case P_INITIAL_META:
909                                 if (tconn->meta.socket) {
910                                         conn_warn(tconn, "initial packet M crossed\n");
911                                         sock_release(tconn->meta.socket);
912                                 }
913                                 tconn->meta.socket = s;
914                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
915                                 break;
916                         default:
917                                 conn_warn(tconn, "Error receiving initial packet\n");
918                                 sock_release(s);
919                                 if (random32() & 1)
920                                         goto retry;
921                         }
922                 }
923
924                 if (tconn->cstate <= C_DISCONNECTING)
925                         goto out_release_sockets;
926                 if (signal_pending(current)) {
927                         flush_signals(current);
928                         smp_rmb();
929                         if (get_t_state(&tconn->receiver) == EXITING)
930                                 goto out_release_sockets;
931                 }
932
933                 if (tconn->data.socket && &tconn->meta.socket) {
934                         ok = drbd_socket_okay(&tconn->data.socket);
935                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
936                         if (ok)
937                                 break;
938                 }
939         } while (1);
940
941         sock  = tconn->data.socket;
942         msock = tconn->meta.socket;
943
944         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
945         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
946
947         sock->sk->sk_allocation = GFP_NOIO;
948         msock->sk->sk_allocation = GFP_NOIO;
949
950         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
951         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
952
953         /* NOT YET ...
954          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
955          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
956          * first set it to the P_CONNECTION_FEATURES timeout,
957          * which we set to 4x the configured ping_timeout. */
958         rcu_read_lock();
959         nc = rcu_dereference(tconn->net_conf);
960
961         sock->sk->sk_sndtimeo =
962         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
963
964         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
965         timeout = nc->timeout * HZ / 10;
966         rcu_read_unlock();
967
968         msock->sk->sk_sndtimeo = timeout;
969
970         /* we don't want delays.
971          * we use TCP_CORK where appropriate, though */
972         drbd_tcp_nodelay(sock);
973         drbd_tcp_nodelay(msock);
974
975         tconn->last_received = jiffies;
976
977         h = drbd_do_features(tconn);
978         if (h <= 0)
979                 return h;
980
981         if (tconn->cram_hmac_tfm) {
982                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
983                 switch (drbd_do_auth(tconn)) {
984                 case -1:
985                         conn_err(tconn, "Authentication of peer failed\n");
986                         return -1;
987                 case 0:
988                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
989                         return 0;
990                 }
991         }
992
993         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
994                 return 0;
995
996         sock->sk->sk_sndtimeo = timeout;
997         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
998
999         drbd_thread_start(&tconn->asender);
1000
1001         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1002                 return -1;
1003
1004         down_read(&drbd_cfg_rwsem);
1005         h = !idr_for_each(&tconn->volumes, drbd_connected, tconn);
1006         up_read(&drbd_cfg_rwsem);
1007         return h;
1008
1009 out_release_sockets:
1010         if (tconn->data.socket) {
1011                 sock_release(tconn->data.socket);
1012                 tconn->data.socket = NULL;
1013         }
1014         if (tconn->meta.socket) {
1015                 sock_release(tconn->meta.socket);
1016                 tconn->meta.socket = NULL;
1017         }
1018         return -1;
1019 }
1020
1021 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1022 {
1023         unsigned int header_size = drbd_header_size(tconn);
1024
1025         if (header_size == sizeof(struct p_header100) &&
1026             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1027                 struct p_header100 *h = header;
1028                 if (h->pad != 0) {
1029                         conn_err(tconn, "Header padding is not zero\n");
1030                         return -EINVAL;
1031                 }
1032                 pi->vnr = be16_to_cpu(h->volume);
1033                 pi->cmd = be16_to_cpu(h->command);
1034                 pi->size = be32_to_cpu(h->length);
1035         } else if (header_size == sizeof(struct p_header95) &&
1036                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1037                 struct p_header95 *h = header;
1038                 pi->cmd = be16_to_cpu(h->command);
1039                 pi->size = be32_to_cpu(h->length);
1040                 pi->vnr = 0;
1041         } else if (header_size == sizeof(struct p_header80) &&
1042                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1043                 struct p_header80 *h = header;
1044                 pi->cmd = be16_to_cpu(h->command);
1045                 pi->size = be16_to_cpu(h->length);
1046                 pi->vnr = 0;
1047         } else {
1048                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1049                          be32_to_cpu(*(__be32 *)header),
1050                          tconn->agreed_pro_version);
1051                 return -EINVAL;
1052         }
1053         pi->data = header + header_size;
1054         return 0;
1055 }
1056
1057 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1058 {
1059         void *buffer = tconn->data.rbuf;
1060         int err;
1061
1062         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1063         if (err)
1064                 return err;
1065
1066         err = decode_header(tconn, buffer, pi);
1067         tconn->last_received = jiffies;
1068
1069         return err;
1070 }
1071
1072 static void drbd_flush(struct drbd_conf *mdev)
1073 {
1074         int rv;
1075
1076         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1077                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1078                                         NULL);
1079                 if (rv) {
1080                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1081                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1082                          * don't try again for ANY return value != 0
1083                          * if (rv == -EOPNOTSUPP) */
1084                         drbd_bump_write_ordering(mdev, WO_drain_io);
1085                 }
1086                 put_ldev(mdev);
1087         }
1088 }
1089
1090 /**
1091  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1092  * @mdev:       DRBD device.
1093  * @epoch:      Epoch object.
1094  * @ev:         Epoch event.
1095  */
1096 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1097                                                struct drbd_epoch *epoch,
1098                                                enum epoch_event ev)
1099 {
1100         int epoch_size;
1101         struct drbd_epoch *next_epoch;
1102         enum finish_epoch rv = FE_STILL_LIVE;
1103
1104         spin_lock(&mdev->epoch_lock);
1105         do {
1106                 next_epoch = NULL;
1107
1108                 epoch_size = atomic_read(&epoch->epoch_size);
1109
1110                 switch (ev & ~EV_CLEANUP) {
1111                 case EV_PUT:
1112                         atomic_dec(&epoch->active);
1113                         break;
1114                 case EV_GOT_BARRIER_NR:
1115                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1116                         break;
1117                 case EV_BECAME_LAST:
1118                         /* nothing to do*/
1119                         break;
1120                 }
1121
1122                 if (epoch_size != 0 &&
1123                     atomic_read(&epoch->active) == 0 &&
1124                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1125                         if (!(ev & EV_CLEANUP)) {
1126                                 spin_unlock(&mdev->epoch_lock);
1127                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1128                                 spin_lock(&mdev->epoch_lock);
1129                         }
1130                         dec_unacked(mdev);
1131
1132                         if (mdev->current_epoch != epoch) {
1133                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1134                                 list_del(&epoch->list);
1135                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1136                                 mdev->epochs--;
1137                                 kfree(epoch);
1138
1139                                 if (rv == FE_STILL_LIVE)
1140                                         rv = FE_DESTROYED;
1141                         } else {
1142                                 epoch->flags = 0;
1143                                 atomic_set(&epoch->epoch_size, 0);
1144                                 /* atomic_set(&epoch->active, 0); is already zero */
1145                                 if (rv == FE_STILL_LIVE)
1146                                         rv = FE_RECYCLED;
1147                                 wake_up(&mdev->ee_wait);
1148                         }
1149                 }
1150
1151                 if (!next_epoch)
1152                         break;
1153
1154                 epoch = next_epoch;
1155         } while (1);
1156
1157         spin_unlock(&mdev->epoch_lock);
1158
1159         return rv;
1160 }
1161
1162 /**
1163  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1164  * @mdev:       DRBD device.
1165  * @wo:         Write ordering method to try.
1166  */
1167 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1168 {
1169         enum write_ordering_e pwo;
1170         static char *write_ordering_str[] = {
1171                 [WO_none] = "none",
1172                 [WO_drain_io] = "drain",
1173                 [WO_bdev_flush] = "flush",
1174         };
1175
1176         pwo = mdev->write_ordering;
1177         wo = min(pwo, wo);
1178         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1179                 wo = WO_drain_io;
1180         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1181                 wo = WO_none;
1182         mdev->write_ordering = wo;
1183         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1184                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1185 }
1186
1187 /**
1188  * drbd_submit_peer_request()
1189  * @mdev:       DRBD device.
1190  * @peer_req:   peer request
1191  * @rw:         flag field, see bio->bi_rw
1192  *
1193  * May spread the pages to multiple bios,
1194  * depending on bio_add_page restrictions.
1195  *
1196  * Returns 0 if all bios have been submitted,
1197  * -ENOMEM if we could not allocate enough bios,
1198  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1199  *  single page to an empty bio (which should never happen and likely indicates
1200  *  that the lower level IO stack is in some way broken). This has been observed
1201  *  on certain Xen deployments.
1202  */
1203 /* TODO allocate from our own bio_set. */
1204 int drbd_submit_peer_request(struct drbd_conf *mdev,
1205                              struct drbd_peer_request *peer_req,
1206                              const unsigned rw, const int fault_type)
1207 {
1208         struct bio *bios = NULL;
1209         struct bio *bio;
1210         struct page *page = peer_req->pages;
1211         sector_t sector = peer_req->i.sector;
1212         unsigned ds = peer_req->i.size;
1213         unsigned n_bios = 0;
1214         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1215         int err = -ENOMEM;
1216
1217         /* In most cases, we will only need one bio.  But in case the lower
1218          * level restrictions happen to be different at this offset on this
1219          * side than those of the sending peer, we may need to submit the
1220          * request in more than one bio.
1221          *
1222          * Plain bio_alloc is good enough here, this is no DRBD internally
1223          * generated bio, but a bio allocated on behalf of the peer.
1224          */
1225 next_bio:
1226         bio = bio_alloc(GFP_NOIO, nr_pages);
1227         if (!bio) {
1228                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1229                 goto fail;
1230         }
1231         /* > peer_req->i.sector, unless this is the first bio */
1232         bio->bi_sector = sector;
1233         bio->bi_bdev = mdev->ldev->backing_bdev;
1234         bio->bi_rw = rw;
1235         bio->bi_private = peer_req;
1236         bio->bi_end_io = drbd_peer_request_endio;
1237
1238         bio->bi_next = bios;
1239         bios = bio;
1240         ++n_bios;
1241
1242         page_chain_for_each(page) {
1243                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1244                 if (!bio_add_page(bio, page, len, 0)) {
1245                         /* A single page must always be possible!
1246                          * But in case it fails anyways,
1247                          * we deal with it, and complain (below). */
1248                         if (bio->bi_vcnt == 0) {
1249                                 dev_err(DEV,
1250                                         "bio_add_page failed for len=%u, "
1251                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1252                                         len, (unsigned long long)bio->bi_sector);
1253                                 err = -ENOSPC;
1254                                 goto fail;
1255                         }
1256                         goto next_bio;
1257                 }
1258                 ds -= len;
1259                 sector += len >> 9;
1260                 --nr_pages;
1261         }
1262         D_ASSERT(page == NULL);
1263         D_ASSERT(ds == 0);
1264
1265         atomic_set(&peer_req->pending_bios, n_bios);
1266         do {
1267                 bio = bios;
1268                 bios = bios->bi_next;
1269                 bio->bi_next = NULL;
1270
1271                 drbd_generic_make_request(mdev, fault_type, bio);
1272         } while (bios);
1273         return 0;
1274
1275 fail:
1276         while (bios) {
1277                 bio = bios;
1278                 bios = bios->bi_next;
1279                 bio_put(bio);
1280         }
1281         return err;
1282 }
1283
1284 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1285                                              struct drbd_peer_request *peer_req)
1286 {
1287         struct drbd_interval *i = &peer_req->i;
1288
1289         drbd_remove_interval(&mdev->write_requests, i);
1290         drbd_clear_interval(i);
1291
1292         /* Wake up any processes waiting for this peer request to complete.  */
1293         if (i->waiting)
1294                 wake_up(&mdev->misc_wait);
1295 }
1296
1297 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1298 {
1299         struct drbd_conf *mdev;
1300         int rv;
1301         struct p_barrier *p = pi->data;
1302         struct drbd_epoch *epoch;
1303
1304         mdev = vnr_to_mdev(tconn, pi->vnr);
1305         if (!mdev)
1306                 return -EIO;
1307
1308         inc_unacked(mdev);
1309
1310         mdev->current_epoch->barrier_nr = p->barrier;
1311         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1312
1313         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1314          * the activity log, which means it would not be resynced in case the
1315          * R_PRIMARY crashes now.
1316          * Therefore we must send the barrier_ack after the barrier request was
1317          * completed. */
1318         switch (mdev->write_ordering) {
1319         case WO_none:
1320                 if (rv == FE_RECYCLED)
1321                         return 0;
1322
1323                 /* receiver context, in the writeout path of the other node.
1324                  * avoid potential distributed deadlock */
1325                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1326                 if (epoch)
1327                         break;
1328                 else
1329                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1330                         /* Fall through */
1331
1332         case WO_bdev_flush:
1333         case WO_drain_io:
1334                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1335                 drbd_flush(mdev);
1336
1337                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1338                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1339                         if (epoch)
1340                                 break;
1341                 }
1342
1343                 epoch = mdev->current_epoch;
1344                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1345
1346                 D_ASSERT(atomic_read(&epoch->active) == 0);
1347                 D_ASSERT(epoch->flags == 0);
1348
1349                 return 0;
1350         default:
1351                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1352                 return -EIO;
1353         }
1354
1355         epoch->flags = 0;
1356         atomic_set(&epoch->epoch_size, 0);
1357         atomic_set(&epoch->active, 0);
1358
1359         spin_lock(&mdev->epoch_lock);
1360         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1361                 list_add(&epoch->list, &mdev->current_epoch->list);
1362                 mdev->current_epoch = epoch;
1363                 mdev->epochs++;
1364         } else {
1365                 /* The current_epoch got recycled while we allocated this one... */
1366                 kfree(epoch);
1367         }
1368         spin_unlock(&mdev->epoch_lock);
1369
1370         return 0;
1371 }
1372
1373 /* used from receive_RSDataReply (recv_resync_read)
1374  * and from receive_Data */
1375 static struct drbd_peer_request *
1376 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1377               int data_size) __must_hold(local)
1378 {
1379         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1380         struct drbd_peer_request *peer_req;
1381         struct page *page;
1382         int dgs, ds, err;
1383         void *dig_in = mdev->tconn->int_dig_in;
1384         void *dig_vv = mdev->tconn->int_dig_vv;
1385         unsigned long *data;
1386
1387         dgs = 0;
1388         if (mdev->tconn->peer_integrity_tfm) {
1389                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1390                 /*
1391                  * FIXME: Receive the incoming digest into the receive buffer
1392                  *        here, together with its struct p_data?
1393                  */
1394                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1395                 if (err)
1396                         return NULL;
1397                 data_size -= dgs;
1398         }
1399
1400         if (!expect(data_size != 0))
1401                 return NULL;
1402         if (!expect(IS_ALIGNED(data_size, 512)))
1403                 return NULL;
1404         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1405                 return NULL;
1406
1407         /* even though we trust out peer,
1408          * we sometimes have to double check. */
1409         if (sector + (data_size>>9) > capacity) {
1410                 dev_err(DEV, "request from peer beyond end of local disk: "
1411                         "capacity: %llus < sector: %llus + size: %u\n",
1412                         (unsigned long long)capacity,
1413                         (unsigned long long)sector, data_size);
1414                 return NULL;
1415         }
1416
1417         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1418          * "criss-cross" setup, that might cause write-out on some other DRBD,
1419          * which in turn might block on the other node at this very place.  */
1420         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1421         if (!peer_req)
1422                 return NULL;
1423
1424         ds = data_size;
1425         page = peer_req->pages;
1426         page_chain_for_each(page) {
1427                 unsigned len = min_t(int, ds, PAGE_SIZE);
1428                 data = kmap(page);
1429                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1430                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1431                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1432                         data[0] = data[0] ^ (unsigned long)-1;
1433                 }
1434                 kunmap(page);
1435                 if (err) {
1436                         drbd_free_peer_req(mdev, peer_req);
1437                         return NULL;
1438                 }
1439                 ds -= len;
1440         }
1441
1442         if (dgs) {
1443                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1444                 if (memcmp(dig_in, dig_vv, dgs)) {
1445                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1446                                 (unsigned long long)sector, data_size);
1447                         drbd_free_peer_req(mdev, peer_req);
1448                         return NULL;
1449                 }
1450         }
1451         mdev->recv_cnt += data_size>>9;
1452         return peer_req;
1453 }
1454
1455 /* drbd_drain_block() just takes a data block
1456  * out of the socket input buffer, and discards it.
1457  */
1458 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1459 {
1460         struct page *page;
1461         int err = 0;
1462         void *data;
1463
1464         if (!data_size)
1465                 return 0;
1466
1467         page = drbd_alloc_pages(mdev, 1, 1);
1468
1469         data = kmap(page);
1470         while (data_size) {
1471                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1472
1473                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1474                 if (err)
1475                         break;
1476                 data_size -= len;
1477         }
1478         kunmap(page);
1479         drbd_free_pages(mdev, page, 0);
1480         return err;
1481 }
1482
1483 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1484                            sector_t sector, int data_size)
1485 {
1486         struct bio_vec *bvec;
1487         struct bio *bio;
1488         int dgs, err, i, expect;
1489         void *dig_in = mdev->tconn->int_dig_in;
1490         void *dig_vv = mdev->tconn->int_dig_vv;
1491
1492         dgs = 0;
1493         if (mdev->tconn->peer_integrity_tfm) {
1494                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1495                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1496                 if (err)
1497                         return err;
1498                 data_size -= dgs;
1499         }
1500
1501         /* optimistically update recv_cnt.  if receiving fails below,
1502          * we disconnect anyways, and counters will be reset. */
1503         mdev->recv_cnt += data_size>>9;
1504
1505         bio = req->master_bio;
1506         D_ASSERT(sector == bio->bi_sector);
1507
1508         bio_for_each_segment(bvec, bio, i) {
1509                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1510                 expect = min_t(int, data_size, bvec->bv_len);
1511                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1512                 kunmap(bvec->bv_page);
1513                 if (err)
1514                         return err;
1515                 data_size -= expect;
1516         }
1517
1518         if (dgs) {
1519                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1520                 if (memcmp(dig_in, dig_vv, dgs)) {
1521                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1522                         return -EINVAL;
1523                 }
1524         }
1525
1526         D_ASSERT(data_size == 0);
1527         return 0;
1528 }
1529
1530 /*
1531  * e_end_resync_block() is called in asender context via
1532  * drbd_finish_peer_reqs().
1533  */
1534 static int e_end_resync_block(struct drbd_work *w, int unused)
1535 {
1536         struct drbd_peer_request *peer_req =
1537                 container_of(w, struct drbd_peer_request, w);
1538         struct drbd_conf *mdev = w->mdev;
1539         sector_t sector = peer_req->i.sector;
1540         int err;
1541
1542         D_ASSERT(drbd_interval_empty(&peer_req->i));
1543
1544         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1545                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1546                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1547         } else {
1548                 /* Record failure to sync */
1549                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1550
1551                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1552         }
1553         dec_unacked(mdev);
1554
1555         return err;
1556 }
1557
1558 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1559 {
1560         struct drbd_peer_request *peer_req;
1561
1562         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1563         if (!peer_req)
1564                 goto fail;
1565
1566         dec_rs_pending(mdev);
1567
1568         inc_unacked(mdev);
1569         /* corresponding dec_unacked() in e_end_resync_block()
1570          * respective _drbd_clear_done_ee */
1571
1572         peer_req->w.cb = e_end_resync_block;
1573
1574         spin_lock_irq(&mdev->tconn->req_lock);
1575         list_add(&peer_req->w.list, &mdev->sync_ee);
1576         spin_unlock_irq(&mdev->tconn->req_lock);
1577
1578         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1579         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1580                 return 0;
1581
1582         /* don't care for the reason here */
1583         dev_err(DEV, "submit failed, triggering re-connect\n");
1584         spin_lock_irq(&mdev->tconn->req_lock);
1585         list_del(&peer_req->w.list);
1586         spin_unlock_irq(&mdev->tconn->req_lock);
1587
1588         drbd_free_peer_req(mdev, peer_req);
1589 fail:
1590         put_ldev(mdev);
1591         return -EIO;
1592 }
1593
1594 static struct drbd_request *
1595 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1596              sector_t sector, bool missing_ok, const char *func)
1597 {
1598         struct drbd_request *req;
1599
1600         /* Request object according to our peer */
1601         req = (struct drbd_request *)(unsigned long)id;
1602         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1603                 return req;
1604         if (!missing_ok) {
1605                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1606                         (unsigned long)id, (unsigned long long)sector);
1607         }
1608         return NULL;
1609 }
1610
1611 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1612 {
1613         struct drbd_conf *mdev;
1614         struct drbd_request *req;
1615         sector_t sector;
1616         int err;
1617         struct p_data *p = pi->data;
1618
1619         mdev = vnr_to_mdev(tconn, pi->vnr);
1620         if (!mdev)
1621                 return -EIO;
1622
1623         sector = be64_to_cpu(p->sector);
1624
1625         spin_lock_irq(&mdev->tconn->req_lock);
1626         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1627         spin_unlock_irq(&mdev->tconn->req_lock);
1628         if (unlikely(!req))
1629                 return -EIO;
1630
1631         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1632          * special casing it there for the various failure cases.
1633          * still no race with drbd_fail_pending_reads */
1634         err = recv_dless_read(mdev, req, sector, pi->size);
1635         if (!err)
1636                 req_mod(req, DATA_RECEIVED);
1637         /* else: nothing. handled from drbd_disconnect...
1638          * I don't think we may complete this just yet
1639          * in case we are "on-disconnect: freeze" */
1640
1641         return err;
1642 }
1643
1644 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1645 {
1646         struct drbd_conf *mdev;
1647         sector_t sector;
1648         int err;
1649         struct p_data *p = pi->data;
1650
1651         mdev = vnr_to_mdev(tconn, pi->vnr);
1652         if (!mdev)
1653                 return -EIO;
1654
1655         sector = be64_to_cpu(p->sector);
1656         D_ASSERT(p->block_id == ID_SYNCER);
1657
1658         if (get_ldev(mdev)) {
1659                 /* data is submitted to disk within recv_resync_read.
1660                  * corresponding put_ldev done below on error,
1661                  * or in drbd_peer_request_endio. */
1662                 err = recv_resync_read(mdev, sector, pi->size);
1663         } else {
1664                 if (__ratelimit(&drbd_ratelimit_state))
1665                         dev_err(DEV, "Can not write resync data to local disk.\n");
1666
1667                 err = drbd_drain_block(mdev, pi->size);
1668
1669                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1670         }
1671
1672         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1673
1674         return err;
1675 }
1676
1677 static int w_restart_write(struct drbd_work *w, int cancel)
1678 {
1679         struct drbd_request *req = container_of(w, struct drbd_request, w);
1680         struct drbd_conf *mdev = w->mdev;
1681         struct bio *bio;
1682         unsigned long start_time;
1683         unsigned long flags;
1684
1685         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1686         if (!expect(req->rq_state & RQ_POSTPONED)) {
1687                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1688                 return -EIO;
1689         }
1690         bio = req->master_bio;
1691         start_time = req->start_time;
1692         /* Postponed requests will not have their master_bio completed!  */
1693         __req_mod(req, DISCARD_WRITE, NULL);
1694         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1695
1696         while (__drbd_make_request(mdev, bio, start_time))
1697                 /* retry */ ;
1698         return 0;
1699 }
1700
1701 static void restart_conflicting_writes(struct drbd_conf *mdev,
1702                                        sector_t sector, int size)
1703 {
1704         struct drbd_interval *i;
1705         struct drbd_request *req;
1706
1707         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1708                 if (!i->local)
1709                         continue;
1710                 req = container_of(i, struct drbd_request, i);
1711                 if (req->rq_state & RQ_LOCAL_PENDING ||
1712                     !(req->rq_state & RQ_POSTPONED))
1713                         continue;
1714                 if (expect(list_empty(&req->w.list))) {
1715                         req->w.mdev = mdev;
1716                         req->w.cb = w_restart_write;
1717                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1718                 }
1719         }
1720 }
1721
1722 /*
1723  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1724  */
1725 static int e_end_block(struct drbd_work *w, int cancel)
1726 {
1727         struct drbd_peer_request *peer_req =
1728                 container_of(w, struct drbd_peer_request, w);
1729         struct drbd_conf *mdev = w->mdev;
1730         sector_t sector = peer_req->i.sector;
1731         int err = 0, pcmd;
1732
1733         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1734                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1735                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1736                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1737                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1738                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1739                         err = drbd_send_ack(mdev, pcmd, peer_req);
1740                         if (pcmd == P_RS_WRITE_ACK)
1741                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1742                 } else {
1743                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1744                         /* we expect it to be marked out of sync anyways...
1745                          * maybe assert this?  */
1746                 }
1747                 dec_unacked(mdev);
1748         }
1749         /* we delete from the conflict detection hash _after_ we sent out the
1750          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1751         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1752                 spin_lock_irq(&mdev->tconn->req_lock);
1753                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1754                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1755                 if (peer_req->flags & EE_RESTART_REQUESTS)
1756                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1757                 spin_unlock_irq(&mdev->tconn->req_lock);
1758         } else
1759                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1760
1761         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1762
1763         return err;
1764 }
1765
1766 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1767 {
1768         struct drbd_conf *mdev = w->mdev;
1769         struct drbd_peer_request *peer_req =
1770                 container_of(w, struct drbd_peer_request, w);
1771         int err;
1772
1773         err = drbd_send_ack(mdev, ack, peer_req);
1774         dec_unacked(mdev);
1775
1776         return err;
1777 }
1778
1779 static int e_send_discard_write(struct drbd_work *w, int unused)
1780 {
1781         return e_send_ack(w, P_DISCARD_WRITE);
1782 }
1783
1784 static int e_send_retry_write(struct drbd_work *w, int unused)
1785 {
1786         struct drbd_tconn *tconn = w->mdev->tconn;
1787
1788         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1789                              P_RETRY_WRITE : P_DISCARD_WRITE);
1790 }
1791
1792 static bool seq_greater(u32 a, u32 b)
1793 {
1794         /*
1795          * We assume 32-bit wrap-around here.
1796          * For 24-bit wrap-around, we would have to shift:
1797          *  a <<= 8; b <<= 8;
1798          */
1799         return (s32)a - (s32)b > 0;
1800 }
1801
1802 static u32 seq_max(u32 a, u32 b)
1803 {
1804         return seq_greater(a, b) ? a : b;
1805 }
1806
1807 static bool need_peer_seq(struct drbd_conf *mdev)
1808 {
1809         struct drbd_tconn *tconn = mdev->tconn;
1810         int tp;
1811
1812         /*
1813          * We only need to keep track of the last packet_seq number of our peer
1814          * if we are in dual-primary mode and we have the discard flag set; see
1815          * handle_write_conflicts().
1816          */
1817
1818         rcu_read_lock();
1819         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1820         rcu_read_unlock();
1821
1822         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1823 }
1824
1825 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1826 {
1827         unsigned int newest_peer_seq;
1828
1829         if (need_peer_seq(mdev)) {
1830                 spin_lock(&mdev->peer_seq_lock);
1831                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1832                 mdev->peer_seq = newest_peer_seq;
1833                 spin_unlock(&mdev->peer_seq_lock);
1834                 /* wake up only if we actually changed mdev->peer_seq */
1835                 if (peer_seq == newest_peer_seq)
1836                         wake_up(&mdev->seq_wait);
1837         }
1838 }
1839
1840 /* Called from receive_Data.
1841  * Synchronize packets on sock with packets on msock.
1842  *
1843  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1844  * packet traveling on msock, they are still processed in the order they have
1845  * been sent.
1846  *
1847  * Note: we don't care for Ack packets overtaking P_DATA packets.
1848  *
1849  * In case packet_seq is larger than mdev->peer_seq number, there are
1850  * outstanding packets on the msock. We wait for them to arrive.
1851  * In case we are the logically next packet, we update mdev->peer_seq
1852  * ourselves. Correctly handles 32bit wrap around.
1853  *
1854  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1855  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1856  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1857  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1858  *
1859  * returns 0 if we may process the packet,
1860  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1861 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1862 {
1863         DEFINE_WAIT(wait);
1864         long timeout;
1865         int ret;
1866
1867         if (!need_peer_seq(mdev))
1868                 return 0;
1869
1870         spin_lock(&mdev->peer_seq_lock);
1871         for (;;) {
1872                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1873                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1874                         ret = 0;
1875                         break;
1876                 }
1877                 if (signal_pending(current)) {
1878                         ret = -ERESTARTSYS;
1879                         break;
1880                 }
1881                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1882                 spin_unlock(&mdev->peer_seq_lock);
1883                 rcu_read_lock();
1884                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1885                 rcu_read_unlock();
1886                 timeout = schedule_timeout(timeout);
1887                 spin_lock(&mdev->peer_seq_lock);
1888                 if (!timeout) {
1889                         ret = -ETIMEDOUT;
1890                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1891                         break;
1892                 }
1893         }
1894         spin_unlock(&mdev->peer_seq_lock);
1895         finish_wait(&mdev->seq_wait, &wait);
1896         return ret;
1897 }
1898
1899 /* see also bio_flags_to_wire()
1900  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1901  * flags and back. We may replicate to other kernel versions. */
1902 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1903 {
1904         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1905                 (dpf & DP_FUA ? REQ_FUA : 0) |
1906                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1907                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1908 }
1909
1910 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1911                                     unsigned int size)
1912 {
1913         struct drbd_interval *i;
1914
1915     repeat:
1916         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1917                 struct drbd_request *req;
1918                 struct bio_and_error m;
1919
1920                 if (!i->local)
1921                         continue;
1922                 req = container_of(i, struct drbd_request, i);
1923                 if (!(req->rq_state & RQ_POSTPONED))
1924                         continue;
1925                 req->rq_state &= ~RQ_POSTPONED;
1926                 __req_mod(req, NEG_ACKED, &m);
1927                 spin_unlock_irq(&mdev->tconn->req_lock);
1928                 if (m.bio)
1929                         complete_master_bio(mdev, &m);
1930                 spin_lock_irq(&mdev->tconn->req_lock);
1931                 goto repeat;
1932         }
1933 }
1934
1935 static int handle_write_conflicts(struct drbd_conf *mdev,
1936                                   struct drbd_peer_request *peer_req)
1937 {
1938         struct drbd_tconn *tconn = mdev->tconn;
1939         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1940         sector_t sector = peer_req->i.sector;
1941         const unsigned int size = peer_req->i.size;
1942         struct drbd_interval *i;
1943         bool equal;
1944         int err;
1945
1946         /*
1947          * Inserting the peer request into the write_requests tree will prevent
1948          * new conflicting local requests from being added.
1949          */
1950         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1951
1952     repeat:
1953         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1954                 if (i == &peer_req->i)
1955                         continue;
1956
1957                 if (!i->local) {
1958                         /*
1959                          * Our peer has sent a conflicting remote request; this
1960                          * should not happen in a two-node setup.  Wait for the
1961                          * earlier peer request to complete.
1962                          */
1963                         err = drbd_wait_misc(mdev, i);
1964                         if (err)
1965                                 goto out;
1966                         goto repeat;
1967                 }
1968
1969                 equal = i->sector == sector && i->size == size;
1970                 if (resolve_conflicts) {
1971                         /*
1972                          * If the peer request is fully contained within the
1973                          * overlapping request, it can be discarded; otherwise,
1974                          * it will be retried once all overlapping requests
1975                          * have completed.
1976                          */
1977                         bool discard = i->sector <= sector && i->sector +
1978                                        (i->size >> 9) >= sector + (size >> 9);
1979
1980                         if (!equal)
1981                                 dev_alert(DEV, "Concurrent writes detected: "
1982                                                "local=%llus +%u, remote=%llus +%u, "
1983                                                "assuming %s came first\n",
1984                                           (unsigned long long)i->sector, i->size,
1985                                           (unsigned long long)sector, size,
1986                                           discard ? "local" : "remote");
1987
1988                         inc_unacked(mdev);
1989                         peer_req->w.cb = discard ? e_send_discard_write :
1990                                                    e_send_retry_write;
1991                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1992                         wake_asender(mdev->tconn);
1993
1994                         err = -ENOENT;
1995                         goto out;
1996                 } else {
1997                         struct drbd_request *req =
1998                                 container_of(i, struct drbd_request, i);
1999
2000                         if (!equal)
2001                                 dev_alert(DEV, "Concurrent writes detected: "
2002                                                "local=%llus +%u, remote=%llus +%u\n",
2003                                           (unsigned long long)i->sector, i->size,
2004                                           (unsigned long long)sector, size);
2005
2006                         if (req->rq_state & RQ_LOCAL_PENDING ||
2007                             !(req->rq_state & RQ_POSTPONED)) {
2008                                 /*
2009                                  * Wait for the node with the discard flag to
2010                                  * decide if this request will be discarded or
2011                                  * retried.  Requests that are discarded will
2012                                  * disappear from the write_requests tree.
2013                                  *
2014                                  * In addition, wait for the conflicting
2015                                  * request to finish locally before submitting
2016                                  * the conflicting peer request.
2017                                  */
2018                                 err = drbd_wait_misc(mdev, &req->i);
2019                                 if (err) {
2020                                         _conn_request_state(mdev->tconn,
2021                                                             NS(conn, C_TIMEOUT),
2022                                                             CS_HARD);
2023                                         fail_postponed_requests(mdev, sector, size);
2024                                         goto out;
2025                                 }
2026                                 goto repeat;
2027                         }
2028                         /*
2029                          * Remember to restart the conflicting requests after
2030                          * the new peer request has completed.
2031                          */
2032                         peer_req->flags |= EE_RESTART_REQUESTS;
2033                 }
2034         }
2035         err = 0;
2036
2037     out:
2038         if (err)
2039                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2040         return err;
2041 }
2042
2043 /* mirrored write */
2044 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2045 {
2046         struct drbd_conf *mdev;
2047         sector_t sector;
2048         struct drbd_peer_request *peer_req;
2049         struct p_data *p = pi->data;
2050         u32 peer_seq = be32_to_cpu(p->seq_num);
2051         int rw = WRITE;
2052         u32 dp_flags;
2053         int err, tp;
2054
2055         mdev = vnr_to_mdev(tconn, pi->vnr);
2056         if (!mdev)
2057                 return -EIO;
2058
2059         if (!get_ldev(mdev)) {
2060                 int err2;
2061
2062                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2063                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2064                 atomic_inc(&mdev->current_epoch->epoch_size);
2065                 err2 = drbd_drain_block(mdev, pi->size);
2066                 if (!err)
2067                         err = err2;
2068                 return err;
2069         }
2070
2071         /*
2072          * Corresponding put_ldev done either below (on various errors), or in
2073          * drbd_peer_request_endio, if we successfully submit the data at the
2074          * end of this function.
2075          */
2076
2077         sector = be64_to_cpu(p->sector);
2078         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2079         if (!peer_req) {
2080                 put_ldev(mdev);
2081                 return -EIO;
2082         }
2083
2084         peer_req->w.cb = e_end_block;
2085
2086         dp_flags = be32_to_cpu(p->dp_flags);
2087         rw |= wire_flags_to_bio(mdev, dp_flags);
2088
2089         if (dp_flags & DP_MAY_SET_IN_SYNC)
2090                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2091
2092         spin_lock(&mdev->epoch_lock);
2093         peer_req->epoch = mdev->current_epoch;
2094         atomic_inc(&peer_req->epoch->epoch_size);
2095         atomic_inc(&peer_req->epoch->active);
2096         spin_unlock(&mdev->epoch_lock);
2097
2098         rcu_read_lock();
2099         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2100         rcu_read_unlock();
2101         if (tp) {
2102                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2103                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2104                 if (err)
2105                         goto out_interrupted;
2106                 spin_lock_irq(&mdev->tconn->req_lock);
2107                 err = handle_write_conflicts(mdev, peer_req);
2108                 if (err) {
2109                         spin_unlock_irq(&mdev->tconn->req_lock);
2110                         if (err == -ENOENT) {
2111                                 put_ldev(mdev);
2112                                 return 0;
2113                         }
2114                         goto out_interrupted;
2115                 }
2116         } else
2117                 spin_lock_irq(&mdev->tconn->req_lock);
2118         list_add(&peer_req->w.list, &mdev->active_ee);
2119         spin_unlock_irq(&mdev->tconn->req_lock);
2120
2121         if (mdev->tconn->agreed_pro_version < 100) {
2122                 rcu_read_lock();
2123                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2124                 case DRBD_PROT_C:
2125                         dp_flags |= DP_SEND_WRITE_ACK;
2126                         break;
2127                 case DRBD_PROT_B:
2128                         dp_flags |= DP_SEND_RECEIVE_ACK;
2129                         break;
2130                 }
2131                 rcu_read_unlock();
2132         }
2133
2134         if (dp_flags & DP_SEND_WRITE_ACK) {
2135                 peer_req->flags |= EE_SEND_WRITE_ACK;
2136                 inc_unacked(mdev);
2137                 /* corresponding dec_unacked() in e_end_block()
2138                  * respective _drbd_clear_done_ee */
2139         }
2140
2141         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2142                 /* I really don't like it that the receiver thread
2143                  * sends on the msock, but anyways */
2144                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2145         }
2146
2147         if (mdev->state.pdsk < D_INCONSISTENT) {
2148                 /* In case we have the only disk of the cluster, */
2149                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2150                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2151                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2152                 drbd_al_begin_io(mdev, &peer_req->i);
2153         }
2154
2155         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2156         if (!err)
2157                 return 0;
2158
2159         /* don't care for the reason here */
2160         dev_err(DEV, "submit failed, triggering re-connect\n");
2161         spin_lock_irq(&mdev->tconn->req_lock);
2162         list_del(&peer_req->w.list);
2163         drbd_remove_epoch_entry_interval(mdev, peer_req);
2164         spin_unlock_irq(&mdev->tconn->req_lock);
2165         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2166                 drbd_al_complete_io(mdev, &peer_req->i);
2167
2168 out_interrupted:
2169         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2170         put_ldev(mdev);
2171         drbd_free_peer_req(mdev, peer_req);
2172         return err;
2173 }
2174
2175 /* We may throttle resync, if the lower device seems to be busy,
2176  * and current sync rate is above c_min_rate.
2177  *
2178  * To decide whether or not the lower device is busy, we use a scheme similar
2179  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2180  * (more than 64 sectors) of activity we cannot account for with our own resync
2181  * activity, it obviously is "busy".
2182  *
2183  * The current sync rate used here uses only the most recent two step marks,
2184  * to have a short time average so we can react faster.
2185  */
2186 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2187 {
2188         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2189         unsigned long db, dt, dbdt;
2190         struct lc_element *tmp;
2191         int curr_events;
2192         int throttle = 0;
2193
2194         /* feature disabled? */
2195         if (mdev->ldev->dc.c_min_rate == 0)
2196                 return 0;
2197
2198         spin_lock_irq(&mdev->al_lock);
2199         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2200         if (tmp) {
2201                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2202                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2203                         spin_unlock_irq(&mdev->al_lock);
2204                         return 0;
2205                 }
2206                 /* Do not slow down if app IO is already waiting for this extent */
2207         }
2208         spin_unlock_irq(&mdev->al_lock);
2209
2210         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2211                       (int)part_stat_read(&disk->part0, sectors[1]) -
2212                         atomic_read(&mdev->rs_sect_ev);
2213
2214         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2215                 unsigned long rs_left;
2216                 int i;
2217
2218                 mdev->rs_last_events = curr_events;
2219
2220                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2221                  * approx. */
2222                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2223
2224                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2225                         rs_left = mdev->ov_left;
2226                 else
2227                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2228
2229                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2230                 if (!dt)
2231                         dt++;
2232                 db = mdev->rs_mark_left[i] - rs_left;
2233                 dbdt = Bit2KB(db/dt);
2234
2235                 if (dbdt > mdev->ldev->dc.c_min_rate)
2236                         throttle = 1;
2237         }
2238         return throttle;
2239 }
2240
2241
2242 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2243 {
2244         struct drbd_conf *mdev;
2245         sector_t sector;
2246         sector_t capacity;
2247         struct drbd_peer_request *peer_req;
2248         struct digest_info *di = NULL;
2249         int size, verb;
2250         unsigned int fault_type;
2251         struct p_block_req *p = pi->data;
2252
2253         mdev = vnr_to_mdev(tconn, pi->vnr);
2254         if (!mdev)
2255                 return -EIO;
2256         capacity = drbd_get_capacity(mdev->this_bdev);
2257
2258         sector = be64_to_cpu(p->sector);
2259         size   = be32_to_cpu(p->blksize);
2260
2261         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2262                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2263                                 (unsigned long long)sector, size);
2264                 return -EINVAL;
2265         }
2266         if (sector + (size>>9) > capacity) {
2267                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2268                                 (unsigned long long)sector, size);
2269                 return -EINVAL;
2270         }
2271
2272         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2273                 verb = 1;
2274                 switch (pi->cmd) {
2275                 case P_DATA_REQUEST:
2276                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2277                         break;
2278                 case P_RS_DATA_REQUEST:
2279                 case P_CSUM_RS_REQUEST:
2280                 case P_OV_REQUEST:
2281                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2282                         break;
2283                 case P_OV_REPLY:
2284                         verb = 0;
2285                         dec_rs_pending(mdev);
2286                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2287                         break;
2288                 default:
2289                         BUG();
2290                 }
2291                 if (verb && __ratelimit(&drbd_ratelimit_state))
2292                         dev_err(DEV, "Can not satisfy peer's read request, "
2293                             "no local data.\n");
2294
2295                 /* drain possibly payload */
2296                 return drbd_drain_block(mdev, pi->size);
2297         }
2298
2299         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2300          * "criss-cross" setup, that might cause write-out on some other DRBD,
2301          * which in turn might block on the other node at this very place.  */
2302         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2303         if (!peer_req) {
2304                 put_ldev(mdev);
2305                 return -ENOMEM;
2306         }
2307
2308         switch (pi->cmd) {
2309         case P_DATA_REQUEST:
2310                 peer_req->w.cb = w_e_end_data_req;
2311                 fault_type = DRBD_FAULT_DT_RD;
2312                 /* application IO, don't drbd_rs_begin_io */
2313                 goto submit;
2314
2315         case P_RS_DATA_REQUEST:
2316                 peer_req->w.cb = w_e_end_rsdata_req;
2317                 fault_type = DRBD_FAULT_RS_RD;
2318                 /* used in the sector offset progress display */
2319                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2320                 break;
2321
2322         case P_OV_REPLY:
2323         case P_CSUM_RS_REQUEST:
2324                 fault_type = DRBD_FAULT_RS_RD;
2325                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2326                 if (!di)
2327                         goto out_free_e;
2328
2329                 di->digest_size = pi->size;
2330                 di->digest = (((char *)di)+sizeof(struct digest_info));
2331
2332                 peer_req->digest = di;
2333                 peer_req->flags |= EE_HAS_DIGEST;
2334
2335                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2336                         goto out_free_e;
2337
2338                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2339                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2340                         peer_req->w.cb = w_e_end_csum_rs_req;
2341                         /* used in the sector offset progress display */
2342                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2343                 } else if (pi->cmd == P_OV_REPLY) {
2344                         /* track progress, we may need to throttle */
2345                         atomic_add(size >> 9, &mdev->rs_sect_in);
2346                         peer_req->w.cb = w_e_end_ov_reply;
2347                         dec_rs_pending(mdev);
2348                         /* drbd_rs_begin_io done when we sent this request,
2349                          * but accounting still needs to be done. */
2350                         goto submit_for_resync;
2351                 }
2352                 break;
2353
2354         case P_OV_REQUEST:
2355                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2356                     mdev->tconn->agreed_pro_version >= 90) {
2357                         unsigned long now = jiffies;
2358                         int i;
2359                         mdev->ov_start_sector = sector;
2360                         mdev->ov_position = sector;
2361                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2362                         mdev->rs_total = mdev->ov_left;
2363                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2364                                 mdev->rs_mark_left[i] = mdev->ov_left;
2365                                 mdev->rs_mark_time[i] = now;
2366                         }
2367                         dev_info(DEV, "Online Verify start sector: %llu\n",
2368                                         (unsigned long long)sector);
2369                 }
2370                 peer_req->w.cb = w_e_end_ov_req;
2371                 fault_type = DRBD_FAULT_RS_RD;
2372                 break;
2373
2374         default:
2375                 BUG();
2376         }
2377
2378         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2379          * wrt the receiver, but it is not as straightforward as it may seem.
2380          * Various places in the resync start and stop logic assume resync
2381          * requests are processed in order, requeuing this on the worker thread
2382          * introduces a bunch of new code for synchronization between threads.
2383          *
2384          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2385          * "forever", throttling after drbd_rs_begin_io will lock that extent
2386          * for application writes for the same time.  For now, just throttle
2387          * here, where the rest of the code expects the receiver to sleep for
2388          * a while, anyways.
2389          */
2390
2391         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2392          * this defers syncer requests for some time, before letting at least
2393          * on request through.  The resync controller on the receiving side
2394          * will adapt to the incoming rate accordingly.
2395          *
2396          * We cannot throttle here if remote is Primary/SyncTarget:
2397          * we would also throttle its application reads.
2398          * In that case, throttling is done on the SyncTarget only.
2399          */
2400         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2401                 schedule_timeout_uninterruptible(HZ/10);
2402         if (drbd_rs_begin_io(mdev, sector))
2403                 goto out_free_e;
2404
2405 submit_for_resync:
2406         atomic_add(size >> 9, &mdev->rs_sect_ev);
2407
2408 submit:
2409         inc_unacked(mdev);
2410         spin_lock_irq(&mdev->tconn->req_lock);
2411         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2412         spin_unlock_irq(&mdev->tconn->req_lock);
2413
2414         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2415                 return 0;
2416
2417         /* don't care for the reason here */
2418         dev_err(DEV, "submit failed, triggering re-connect\n");
2419         spin_lock_irq(&mdev->tconn->req_lock);
2420         list_del(&peer_req->w.list);
2421         spin_unlock_irq(&mdev->tconn->req_lock);
2422         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2423
2424 out_free_e:
2425         put_ldev(mdev);
2426         drbd_free_peer_req(mdev, peer_req);
2427         return -EIO;
2428 }
2429
2430 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2431 {
2432         int self, peer, rv = -100;
2433         unsigned long ch_self, ch_peer;
2434         enum drbd_after_sb_p after_sb_0p;
2435
2436         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2437         peer = mdev->p_uuid[UI_BITMAP] & 1;
2438
2439         ch_peer = mdev->p_uuid[UI_SIZE];
2440         ch_self = mdev->comm_bm_set;
2441
2442         rcu_read_lock();
2443         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2444         rcu_read_unlock();
2445         switch (after_sb_0p) {
2446         case ASB_CONSENSUS:
2447         case ASB_DISCARD_SECONDARY:
2448         case ASB_CALL_HELPER:
2449         case ASB_VIOLENTLY:
2450                 dev_err(DEV, "Configuration error.\n");
2451                 break;
2452         case ASB_DISCONNECT:
2453                 break;
2454         case ASB_DISCARD_YOUNGER_PRI:
2455                 if (self == 0 && peer == 1) {
2456                         rv = -1;
2457                         break;
2458                 }
2459                 if (self == 1 && peer == 0) {
2460                         rv =  1;
2461                         break;
2462                 }
2463                 /* Else fall through to one of the other strategies... */
2464         case ASB_DISCARD_OLDER_PRI:
2465                 if (self == 0 && peer == 1) {
2466                         rv = 1;
2467                         break;
2468                 }
2469                 if (self == 1 && peer == 0) {
2470                         rv = -1;
2471                         break;
2472                 }
2473                 /* Else fall through to one of the other strategies... */
2474                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2475                      "Using discard-least-changes instead\n");
2476         case ASB_DISCARD_ZERO_CHG:
2477                 if (ch_peer == 0 && ch_self == 0) {
2478                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2479                                 ? -1 : 1;
2480                         break;
2481                 } else {
2482                         if (ch_peer == 0) { rv =  1; break; }
2483                         if (ch_self == 0) { rv = -1; break; }
2484                 }
2485                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2486                         break;
2487         case ASB_DISCARD_LEAST_CHG:
2488                 if      (ch_self < ch_peer)
2489                         rv = -1;
2490                 else if (ch_self > ch_peer)
2491                         rv =  1;
2492                 else /* ( ch_self == ch_peer ) */
2493                      /* Well, then use something else. */
2494                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2495                                 ? -1 : 1;
2496                 break;
2497         case ASB_DISCARD_LOCAL:
2498                 rv = -1;
2499                 break;
2500         case ASB_DISCARD_REMOTE:
2501                 rv =  1;
2502         }
2503
2504         return rv;
2505 }
2506
2507 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2508 {
2509         int hg, rv = -100;
2510         enum drbd_after_sb_p after_sb_1p;
2511
2512         rcu_read_lock();
2513         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2514         rcu_read_unlock();
2515         switch (after_sb_1p) {
2516         case ASB_DISCARD_YOUNGER_PRI:
2517         case ASB_DISCARD_OLDER_PRI:
2518         case ASB_DISCARD_LEAST_CHG:
2519         case ASB_DISCARD_LOCAL:
2520         case ASB_DISCARD_REMOTE:
2521         case ASB_DISCARD_ZERO_CHG:
2522                 dev_err(DEV, "Configuration error.\n");
2523                 break;
2524         case ASB_DISCONNECT:
2525                 break;
2526         case ASB_CONSENSUS:
2527                 hg = drbd_asb_recover_0p(mdev);
2528                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2529                         rv = hg;
2530                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2531                         rv = hg;
2532                 break;
2533         case ASB_VIOLENTLY:
2534                 rv = drbd_asb_recover_0p(mdev);
2535                 break;
2536         case ASB_DISCARD_SECONDARY:
2537                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2538         case ASB_CALL_HELPER:
2539                 hg = drbd_asb_recover_0p(mdev);
2540                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2541                         enum drbd_state_rv rv2;
2542
2543                         drbd_set_role(mdev, R_SECONDARY, 0);
2544                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2545                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2546                           * we do not need to wait for the after state change work either. */
2547                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2548                         if (rv2 != SS_SUCCESS) {
2549                                 drbd_khelper(mdev, "pri-lost-after-sb");
2550                         } else {
2551                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2552                                 rv = hg;
2553                         }
2554                 } else
2555                         rv = hg;
2556         }
2557
2558         return rv;
2559 }
2560
2561 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2562 {
2563         int hg, rv = -100;
2564         enum drbd_after_sb_p after_sb_2p;
2565
2566         rcu_read_lock();
2567         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2568         rcu_read_unlock();
2569         switch (after_sb_2p) {
2570         case ASB_DISCARD_YOUNGER_PRI:
2571         case ASB_DISCARD_OLDER_PRI:
2572         case ASB_DISCARD_LEAST_CHG:
2573         case ASB_DISCARD_LOCAL:
2574         case ASB_DISCARD_REMOTE:
2575         case ASB_CONSENSUS:
2576         case ASB_DISCARD_SECONDARY:
2577         case ASB_DISCARD_ZERO_CHG:
2578                 dev_err(DEV, "Configuration error.\n");
2579                 break;
2580         case ASB_VIOLENTLY:
2581                 rv = drbd_asb_recover_0p(mdev);
2582                 break;
2583         case ASB_DISCONNECT:
2584                 break;
2585         case ASB_CALL_HELPER:
2586                 hg = drbd_asb_recover_0p(mdev);
2587                 if (hg == -1) {
2588                         enum drbd_state_rv rv2;
2589
2590                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2591                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2592                           * we do not need to wait for the after state change work either. */
2593                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2594                         if (rv2 != SS_SUCCESS) {
2595                                 drbd_khelper(mdev, "pri-lost-after-sb");
2596                         } else {
2597                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2598                                 rv = hg;
2599                         }
2600                 } else
2601                         rv = hg;
2602         }
2603
2604         return rv;
2605 }
2606
2607 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2608                            u64 bits, u64 flags)
2609 {
2610         if (!uuid) {
2611                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2612                 return;
2613         }
2614         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2615              text,
2616              (unsigned long long)uuid[UI_CURRENT],
2617              (unsigned long long)uuid[UI_BITMAP],
2618              (unsigned long long)uuid[UI_HISTORY_START],
2619              (unsigned long long)uuid[UI_HISTORY_END],
2620              (unsigned long long)bits,
2621              (unsigned long long)flags);
2622 }
2623
2624 /*
2625   100   after split brain try auto recover
2626     2   C_SYNC_SOURCE set BitMap
2627     1   C_SYNC_SOURCE use BitMap
2628     0   no Sync
2629    -1   C_SYNC_TARGET use BitMap
2630    -2   C_SYNC_TARGET set BitMap
2631  -100   after split brain, disconnect
2632 -1000   unrelated data
2633 -1091   requires proto 91
2634 -1096   requires proto 96
2635  */
2636 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2637 {
2638         u64 self, peer;
2639         int i, j;
2640
2641         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2642         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2643
2644         *rule_nr = 10;
2645         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2646                 return 0;
2647
2648         *rule_nr = 20;
2649         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2650              peer != UUID_JUST_CREATED)
2651                 return -2;
2652
2653         *rule_nr = 30;
2654         if (self != UUID_JUST_CREATED &&
2655             (peer == UUID_JUST_CREATED || peer == (u64)0))
2656                 return 2;
2657
2658         if (self == peer) {
2659                 int rct, dc; /* roles at crash time */
2660
2661                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2662
2663                         if (mdev->tconn->agreed_pro_version < 91)
2664                                 return -1091;
2665
2666                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2667                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2668                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2669                                 drbd_uuid_set_bm(mdev, 0UL);
2670
2671                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2672                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2673                                 *rule_nr = 34;
2674                         } else {
2675                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2676                                 *rule_nr = 36;
2677                         }
2678
2679                         return 1;
2680                 }
2681
2682                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2683
2684                         if (mdev->tconn->agreed_pro_version < 91)
2685                                 return -1091;
2686
2687                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2688                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2689                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2690
2691                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2692                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2693                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2694
2695                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2696                                 *rule_nr = 35;
2697                         } else {
2698                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2699                                 *rule_nr = 37;
2700                         }
2701
2702                         return -1;
2703                 }
2704
2705                 /* Common power [off|failure] */
2706                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2707                         (mdev->p_uuid[UI_FLAGS] & 2);
2708                 /* lowest bit is set when we were primary,
2709                  * next bit (weight 2) is set when peer was primary */
2710                 *rule_nr = 40;
2711
2712                 switch (rct) {
2713                 case 0: /* !self_pri && !peer_pri */ return 0;
2714                 case 1: /*  self_pri && !peer_pri */ return 1;
2715                 case 2: /* !self_pri &&  peer_pri */ return -1;
2716                 case 3: /*  self_pri &&  peer_pri */
2717                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2718                         return dc ? -1 : 1;
2719                 }
2720         }
2721
2722         *rule_nr = 50;
2723         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2724         if (self == peer)
2725                 return -1;
2726
2727         *rule_nr = 51;
2728         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2729         if (self == peer) {
2730                 if (mdev->tconn->agreed_pro_version < 96 ?
2731                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2732                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2733                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2734                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2735                            resync as sync source modifications of the peer's UUIDs. */
2736
2737                         if (mdev->tconn->agreed_pro_version < 91)
2738                                 return -1091;
2739
2740                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2741                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2742
2743                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2744                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2745
2746                         return -1;
2747                 }
2748         }
2749
2750         *rule_nr = 60;
2751         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2752         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2753                 peer = mdev->p_uuid[i] & ~((u64)1);
2754                 if (self == peer)
2755                         return -2;
2756         }
2757
2758         *rule_nr = 70;
2759         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2760         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2761         if (self == peer)
2762                 return 1;
2763
2764         *rule_nr = 71;
2765         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2766         if (self == peer) {
2767                 if (mdev->tconn->agreed_pro_version < 96 ?
2768                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2769                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2770                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2771                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2772                            resync as sync source modifications of our UUIDs. */
2773
2774                         if (mdev->tconn->agreed_pro_version < 91)
2775                                 return -1091;
2776
2777                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2778                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2779
2780                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2781                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2782                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2783
2784                         return 1;
2785                 }
2786         }
2787
2788
2789         *rule_nr = 80;
2790         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2791         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2792                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2793                 if (self == peer)
2794                         return 2;
2795         }
2796
2797         *rule_nr = 90;
2798         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2799         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2800         if (self == peer && self != ((u64)0))
2801                 return 100;
2802
2803         *rule_nr = 100;
2804         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2805                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2806                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2807                         peer = mdev->p_uuid[j] & ~((u64)1);
2808                         if (self == peer)
2809                                 return -100;
2810                 }
2811         }
2812
2813         return -1000;
2814 }
2815
2816 /* drbd_sync_handshake() returns the new conn state on success, or
2817    CONN_MASK (-1) on failure.
2818  */
2819 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2820                                            enum drbd_disk_state peer_disk) __must_hold(local)
2821 {
2822         enum drbd_conns rv = C_MASK;
2823         enum drbd_disk_state mydisk;
2824         struct net_conf *nc;
2825         int hg, rule_nr, rr_conflict, dry_run;
2826
2827         mydisk = mdev->state.disk;
2828         if (mydisk == D_NEGOTIATING)
2829                 mydisk = mdev->new_state_tmp.disk;
2830
2831         dev_info(DEV, "drbd_sync_handshake:\n");
2832         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2833         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2834                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2835
2836         hg = drbd_uuid_compare(mdev, &rule_nr);
2837
2838         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2839
2840         if (hg == -1000) {
2841                 dev_alert(DEV, "Unrelated data, aborting!\n");
2842                 return C_MASK;
2843         }
2844         if (hg < -1000) {
2845                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2846                 return C_MASK;
2847         }
2848
2849         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2850             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2851                 int f = (hg == -100) || abs(hg) == 2;
2852                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2853                 if (f)
2854                         hg = hg*2;
2855                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2856                      hg > 0 ? "source" : "target");
2857         }
2858
2859         if (abs(hg) == 100)
2860                 drbd_khelper(mdev, "initial-split-brain");
2861
2862         rcu_read_lock();
2863         nc = rcu_dereference(mdev->tconn->net_conf);
2864
2865         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2866                 int pcount = (mdev->state.role == R_PRIMARY)
2867                            + (peer_role == R_PRIMARY);
2868                 int forced = (hg == -100);
2869
2870                 switch (pcount) {
2871                 case 0:
2872                         hg = drbd_asb_recover_0p(mdev);
2873                         break;
2874                 case 1:
2875                         hg = drbd_asb_recover_1p(mdev);
2876                         break;
2877                 case 2:
2878                         hg = drbd_asb_recover_2p(mdev);
2879                         break;
2880                 }
2881                 if (abs(hg) < 100) {
2882                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2883                              "automatically solved. Sync from %s node\n",
2884                              pcount, (hg < 0) ? "peer" : "this");
2885                         if (forced) {
2886                                 dev_warn(DEV, "Doing a full sync, since"
2887                                      " UUIDs where ambiguous.\n");
2888                                 hg = hg*2;
2889                         }
2890                 }
2891         }
2892
2893         if (hg == -100) {
2894                 if (nc->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2895                         hg = -1;
2896                 if (!nc->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2897                         hg = 1;
2898
2899                 if (abs(hg) < 100)
2900                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2901                              "Sync from %s node\n",
2902                              (hg < 0) ? "peer" : "this");
2903         }
2904         rr_conflict = nc->rr_conflict;
2905         dry_run = nc->dry_run;
2906         rcu_read_unlock();
2907
2908         if (hg == -100) {
2909                 /* FIXME this log message is not correct if we end up here
2910                  * after an attempted attach on a diskless node.
2911                  * We just refuse to attach -- well, we drop the "connection"
2912                  * to that disk, in a way... */
2913                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2914                 drbd_khelper(mdev, "split-brain");
2915                 return C_MASK;
2916         }
2917
2918         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2919                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2920                 return C_MASK;
2921         }
2922
2923         if (hg < 0 && /* by intention we do not use mydisk here. */
2924             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2925                 switch (rr_conflict) {
2926                 case ASB_CALL_HELPER:
2927                         drbd_khelper(mdev, "pri-lost");
2928                         /* fall through */
2929                 case ASB_DISCONNECT:
2930                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2931                         return C_MASK;
2932                 case ASB_VIOLENTLY:
2933                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2934                              "assumption\n");
2935                 }
2936         }
2937
2938         if (dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2939                 if (hg == 0)
2940                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2941                 else
2942                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2943                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2944                                  abs(hg) >= 2 ? "full" : "bit-map based");
2945                 return C_MASK;
2946         }
2947
2948         if (abs(hg) >= 2) {
2949                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2950                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2951                                         BM_LOCKED_SET_ALLOWED))
2952                         return C_MASK;
2953         }
2954
2955         if (hg > 0) { /* become sync source. */
2956                 rv = C_WF_BITMAP_S;
2957         } else if (hg < 0) { /* become sync target */
2958                 rv = C_WF_BITMAP_T;
2959         } else {
2960                 rv = C_CONNECTED;
2961                 if (drbd_bm_total_weight(mdev)) {
2962                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2963                              drbd_bm_total_weight(mdev));
2964                 }
2965         }
2966
2967         return rv;
2968 }
2969
2970 /* returns 1 if invalid */
2971 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2972 {
2973         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2974         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2975             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2976                 return 0;
2977
2978         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2979         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2980             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2981                 return 1;
2982
2983         /* everything else is valid if they are equal on both sides. */
2984         if (peer == self)
2985                 return 0;
2986
2987         /* everything es is invalid. */
2988         return 1;
2989 }
2990
2991 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2992 {
2993         struct p_protocol *p = pi->data;
2994         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2995         int p_want_lose, p_two_primaries, cf;
2996         struct net_conf *nc;
2997
2998         p_proto         = be32_to_cpu(p->protocol);
2999         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3000         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3001         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3002         p_two_primaries = be32_to_cpu(p->two_primaries);
3003         cf              = be32_to_cpu(p->conn_flags);
3004         p_want_lose = cf & CF_WANT_LOSE;
3005
3006         if (tconn->agreed_pro_version >= 87) {
3007                 char integrity_alg[SHARED_SECRET_MAX];
3008                 struct crypto_hash *tfm = NULL;
3009                 int err;
3010
3011                 if (pi->size > sizeof(integrity_alg))
3012                         return -EIO;
3013                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3014                 if (err)
3015                         return err;
3016                 integrity_alg[SHARED_SECRET_MAX-1] = 0;
3017
3018                 if (integrity_alg[0]) {
3019                         tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3020                         if (!tfm) {
3021                                 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3022                                          integrity_alg);
3023                                 goto disconnect;
3024                         }
3025                         conn_info(tconn, "peer data-integrity-alg: %s\n", integrity_alg);
3026                 }
3027
3028                 if (tconn->peer_integrity_tfm)
3029                         crypto_free_hash(tconn->peer_integrity_tfm);
3030                 tconn->peer_integrity_tfm = tfm;
3031         }
3032
3033         clear_bit(CONN_DRY_RUN, &tconn->flags);
3034
3035         if (cf & CF_DRY_RUN)
3036                 set_bit(CONN_DRY_RUN, &tconn->flags);
3037
3038         rcu_read_lock();
3039         nc = rcu_dereference(tconn->net_conf);
3040
3041         if (p_proto != nc->wire_protocol && tconn->agreed_pro_version < 100) {
3042                 conn_err(tconn, "incompatible communication protocols\n");
3043                 goto disconnect_rcu_unlock;
3044         }
3045
3046         if (cmp_after_sb(p_after_sb_0p, nc->after_sb_0p)) {
3047                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
3048                 goto disconnect_rcu_unlock;
3049         }
3050
3051         if (cmp_after_sb(p_after_sb_1p, nc->after_sb_1p)) {
3052                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
3053                 goto disconnect_rcu_unlock;
3054         }
3055
3056         if (cmp_after_sb(p_after_sb_2p, nc->after_sb_2p)) {
3057                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
3058                 goto disconnect_rcu_unlock;
3059         }
3060
3061         if (p_want_lose && nc->want_lose) {
3062                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
3063                 goto disconnect_rcu_unlock;
3064         }
3065
3066         if (p_two_primaries != nc->two_primaries) {
3067                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
3068                 goto disconnect_rcu_unlock;
3069         }
3070
3071         rcu_read_unlock();
3072
3073         return 0;
3074
3075 disconnect_rcu_unlock:
3076         rcu_read_unlock();
3077 disconnect:
3078         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3079         return -EIO;
3080 }
3081
3082 /* helper function
3083  * input: alg name, feature name
3084  * return: NULL (alg name was "")
3085  *         ERR_PTR(error) if something goes wrong
3086  *         or the crypto hash ptr, if it worked out ok. */
3087 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3088                 const char *alg, const char *name)
3089 {
3090         struct crypto_hash *tfm;
3091
3092         if (!alg[0])
3093                 return NULL;
3094
3095         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3096         if (IS_ERR(tfm)) {
3097                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3098                         alg, name, PTR_ERR(tfm));
3099                 return tfm;
3100         }
3101         return tfm;
3102 }
3103
3104 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3105 {
3106         void *buffer = tconn->data.rbuf;
3107         int size = pi->size;
3108
3109         while (size) {
3110                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3111                 s = drbd_recv(tconn, buffer, s);
3112                 if (s <= 0) {
3113                         if (s < 0)
3114                                 return s;
3115                         break;
3116                 }
3117                 size -= s;
3118         }
3119         if (size)
3120                 return -EIO;
3121         return 0;
3122 }
3123
3124 /*
3125  * config_unknown_volume  -  device configuration command for unknown volume
3126  *
3127  * When a device is added to an existing connection, the node on which the
3128  * device is added first will send configuration commands to its peer but the
3129  * peer will not know about the device yet.  It will warn and ignore these
3130  * commands.  Once the device is added on the second node, the second node will
3131  * send the same device configuration commands, but in the other direction.
3132  *
3133  * (We can also end up here if drbd is misconfigured.)
3134  */
3135 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3136 {
3137         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3138                   pi->vnr, cmdname(pi->cmd));
3139         return ignore_remaining_packet(tconn, pi);
3140 }
3141
3142 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3143 {
3144         struct drbd_conf *mdev;
3145         struct p_rs_param_95 *p;
3146         unsigned int header_size, data_size, exp_max_sz;
3147         struct crypto_hash *verify_tfm = NULL;
3148         struct crypto_hash *csums_tfm = NULL;
3149         struct net_conf *old_conf, *new_conf = NULL;
3150         const int apv = tconn->agreed_pro_version;
3151         int *rs_plan_s = NULL;
3152         int fifo_size = 0;
3153         int err;
3154
3155         mdev = vnr_to_mdev(tconn, pi->vnr);
3156         if (!mdev)
3157                 return config_unknown_volume(tconn, pi);
3158
3159         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3160                     : apv == 88 ? sizeof(struct p_rs_param)
3161                                         + SHARED_SECRET_MAX
3162                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3163                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3164
3165         if (pi->size > exp_max_sz) {
3166                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3167                     pi->size, exp_max_sz);
3168                 return -EIO;
3169         }
3170
3171         if (apv <= 88) {
3172                 header_size = sizeof(struct p_rs_param);
3173                 data_size = pi->size - header_size;
3174         } else if (apv <= 94) {
3175                 header_size = sizeof(struct p_rs_param_89);
3176                 data_size = pi->size - header_size;
3177                 D_ASSERT(data_size == 0);
3178         } else {
3179                 header_size = sizeof(struct p_rs_param_95);
3180                 data_size = pi->size - header_size;
3181                 D_ASSERT(data_size == 0);
3182         }
3183
3184         /* initialize verify_alg and csums_alg */
3185         p = pi->data;
3186         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3187
3188         err = drbd_recv_all(mdev->tconn, p, header_size);
3189         if (err)
3190                 return err;
3191
3192         if (get_ldev(mdev)) {
3193                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3194                 put_ldev(mdev);
3195         }
3196
3197         if (apv >= 88) {
3198                 if (apv == 88) {
3199                         if (data_size > SHARED_SECRET_MAX) {
3200                                 dev_err(DEV, "verify-alg too long, "
3201                                     "peer wants %u, accepting only %u byte\n",
3202                                                 data_size, SHARED_SECRET_MAX);
3203                                 return -EIO;
3204                         }
3205
3206                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3207                         if (err)
3208                                 return err;
3209
3210                         /* we expect NUL terminated string */
3211                         /* but just in case someone tries to be evil */
3212                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3213                         p->verify_alg[data_size-1] = 0;
3214
3215                 } else /* apv >= 89 */ {
3216                         /* we still expect NUL terminated strings */
3217                         /* but just in case someone tries to be evil */
3218                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3219                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3220                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3221                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3222                 }
3223
3224                 mutex_lock(&mdev->tconn->net_conf_update);
3225                 old_conf = mdev->tconn->net_conf;
3226
3227                 if (strcmp(old_conf->verify_alg, p->verify_alg)) {
3228                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3229                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3230                                     old_conf->verify_alg, p->verify_alg);
3231                                 goto disconnect;
3232                         }
3233                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3234                                         p->verify_alg, "verify-alg");
3235                         if (IS_ERR(verify_tfm)) {
3236                                 verify_tfm = NULL;
3237                                 goto disconnect;
3238                         }
3239                 }
3240
3241                 if (apv >= 89 && strcmp(old_conf->csums_alg, p->csums_alg)) {
3242                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3243                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3244                                     old_conf->csums_alg, p->csums_alg);
3245                                 goto disconnect;
3246                         }
3247                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3248                                         p->csums_alg, "csums-alg");
3249                         if (IS_ERR(csums_tfm)) {
3250                                 csums_tfm = NULL;
3251                                 goto disconnect;
3252                         }
3253                 }
3254
3255                 if (apv > 94 && get_ldev(mdev)) {
3256                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3257                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3258                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3259                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3260                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3261
3262                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3263                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3264                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3265                                 if (!rs_plan_s) {
3266                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3267                                         put_ldev(mdev);
3268                                         goto disconnect;
3269                                 }
3270                         }
3271                         put_ldev(mdev);
3272                 }
3273
3274                 if (verify_tfm || csums_tfm) {
3275                         new_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3276                         if (!new_conf) {
3277                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3278                                 goto disconnect;
3279                         }
3280
3281                         *new_conf = *old_conf;
3282
3283                         if (verify_tfm) {
3284                                 strcpy(new_conf->verify_alg, p->verify_alg);
3285                                 new_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3286                                 crypto_free_hash(mdev->tconn->verify_tfm);
3287                                 mdev->tconn->verify_tfm = verify_tfm;
3288                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3289                         }
3290                         if (csums_tfm) {
3291                                 strcpy(new_conf->csums_alg, p->csums_alg);
3292                                 new_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3293                                 crypto_free_hash(mdev->tconn->csums_tfm);
3294                                 mdev->tconn->csums_tfm = csums_tfm;
3295                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3296                         }
3297                         rcu_assign_pointer(tconn->net_conf, new_conf);
3298                 }
3299                 mutex_unlock(&mdev->tconn->net_conf_update);
3300                 if (new_conf) {
3301                         synchronize_rcu();
3302                         kfree(old_conf);
3303                 }
3304
3305                 spin_lock(&mdev->peer_seq_lock);
3306                 if (fifo_size != mdev->rs_plan_s.size) {
3307                         kfree(mdev->rs_plan_s.values);
3308                         mdev->rs_plan_s.values = rs_plan_s;
3309                         mdev->rs_plan_s.size   = fifo_size;
3310                         mdev->rs_planed = 0;
3311                 }
3312                 spin_unlock(&mdev->peer_seq_lock);
3313         }
3314         return 0;
3315
3316 disconnect:
3317         mutex_unlock(&mdev->tconn->net_conf_update);
3318         /* just for completeness: actually not needed,
3319          * as this is not reached if csums_tfm was ok. */
3320         crypto_free_hash(csums_tfm);
3321         /* but free the verify_tfm again, if csums_tfm did not work out */
3322         crypto_free_hash(verify_tfm);
3323         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3324         return -EIO;
3325 }
3326
3327 /* warn if the arguments differ by more than 12.5% */
3328 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3329         const char *s, sector_t a, sector_t b)
3330 {
3331         sector_t d;
3332         if (a == 0 || b == 0)
3333                 return;
3334         d = (a > b) ? (a - b) : (b - a);
3335         if (d > (a>>3) || d > (b>>3))
3336                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3337                      (unsigned long long)a, (unsigned long long)b);
3338 }
3339
3340 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3341 {
3342         struct drbd_conf *mdev;
3343         struct p_sizes *p = pi->data;
3344         enum determine_dev_size dd = unchanged;
3345         sector_t p_size, p_usize, my_usize;
3346         int ldsc = 0; /* local disk size changed */
3347         enum dds_flags ddsf;
3348
3349         mdev = vnr_to_mdev(tconn, pi->vnr);
3350         if (!mdev)
3351                 return config_unknown_volume(tconn, pi);
3352
3353         p_size = be64_to_cpu(p->d_size);
3354         p_usize = be64_to_cpu(p->u_size);
3355
3356         /* just store the peer's disk size for now.
3357          * we still need to figure out whether we accept that. */
3358         mdev->p_size = p_size;
3359
3360         if (get_ldev(mdev)) {
3361                 warn_if_differ_considerably(mdev, "lower level device sizes",
3362                            p_size, drbd_get_max_capacity(mdev->ldev));
3363                 warn_if_differ_considerably(mdev, "user requested size",
3364                                             p_usize, mdev->ldev->dc.disk_size);
3365
3366                 /* if this is the first connect, or an otherwise expected
3367                  * param exchange, choose the minimum */
3368                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3369                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3370                                              p_usize);
3371
3372                 my_usize = mdev->ldev->dc.disk_size;
3373
3374                 if (mdev->ldev->dc.disk_size != p_usize) {
3375                         mdev->ldev->dc.disk_size = p_usize;
3376                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3377                              (unsigned long)mdev->ldev->dc.disk_size);
3378                 }
3379
3380                 /* Never shrink a device with usable data during connect.
3381                    But allow online shrinking if we are connected. */
3382                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3383                    drbd_get_capacity(mdev->this_bdev) &&
3384                    mdev->state.disk >= D_OUTDATED &&
3385                    mdev->state.conn < C_CONNECTED) {
3386                         dev_err(DEV, "The peer's disk size is too small!\n");
3387                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3388                         mdev->ldev->dc.disk_size = my_usize;
3389                         put_ldev(mdev);
3390                         return -EIO;
3391                 }
3392                 put_ldev(mdev);
3393         }
3394
3395         ddsf = be16_to_cpu(p->dds_flags);
3396         if (get_ldev(mdev)) {
3397                 dd = drbd_determine_dev_size(mdev, ddsf);
3398                 put_ldev(mdev);
3399                 if (dd == dev_size_error)
3400                         return -EIO;
3401                 drbd_md_sync(mdev);
3402         } else {
3403                 /* I am diskless, need to accept the peer's size. */
3404                 drbd_set_my_capacity(mdev, p_size);
3405         }
3406
3407         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3408         drbd_reconsider_max_bio_size(mdev);
3409
3410         if (get_ldev(mdev)) {
3411                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3412                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3413                         ldsc = 1;
3414                 }
3415
3416                 put_ldev(mdev);
3417         }
3418
3419         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3420                 if (be64_to_cpu(p->c_size) !=
3421                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3422                         /* we have different sizes, probably peer
3423                          * needs to know my new size... */
3424                         drbd_send_sizes(mdev, 0, ddsf);
3425                 }
3426                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3427                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3428                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3429                             mdev->state.disk >= D_INCONSISTENT) {
3430                                 if (ddsf & DDSF_NO_RESYNC)
3431                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3432                                 else
3433                                         resync_after_online_grow(mdev);
3434                         } else
3435                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3436                 }
3437         }
3438
3439         return 0;
3440 }
3441
3442 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3443 {
3444         struct drbd_conf *mdev;
3445         struct p_uuids *p = pi->data;
3446         u64 *p_uuid;
3447         int i, updated_uuids = 0;
3448
3449         mdev = vnr_to_mdev(tconn, pi->vnr);
3450         if (!mdev)
3451                 return config_unknown_volume(tconn, pi);
3452
3453         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3454
3455         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3456                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3457
3458         kfree(mdev->p_uuid);
3459         mdev->p_uuid = p_uuid;
3460
3461         if (mdev->state.conn < C_CONNECTED &&
3462             mdev->state.disk < D_INCONSISTENT &&
3463             mdev->state.role == R_PRIMARY &&
3464             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3465                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3466                     (unsigned long long)mdev->ed_uuid);
3467                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3468                 return -EIO;
3469         }
3470
3471         if (get_ldev(mdev)) {
3472                 int skip_initial_sync =
3473                         mdev->state.conn == C_CONNECTED &&
3474                         mdev->tconn->agreed_pro_version >= 90 &&
3475                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3476                         (p_uuid[UI_FLAGS] & 8);
3477                 if (skip_initial_sync) {
3478                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3479                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3480                                         "clear_n_write from receive_uuids",
3481                                         BM_LOCKED_TEST_ALLOWED);
3482                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3483                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3484                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3485                                         CS_VERBOSE, NULL);
3486                         drbd_md_sync(mdev);
3487                         updated_uuids = 1;
3488                 }
3489                 put_ldev(mdev);
3490         } else if (mdev->state.disk < D_INCONSISTENT &&
3491                    mdev->state.role == R_PRIMARY) {
3492                 /* I am a diskless primary, the peer just created a new current UUID
3493                    for me. */
3494                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3495         }
3496
3497         /* Before we test for the disk state, we should wait until an eventually
3498            ongoing cluster wide state change is finished. That is important if
3499            we are primary and are detaching from our disk. We need to see the
3500            new disk state... */
3501         mutex_lock(mdev->state_mutex);
3502         mutex_unlock(mdev->state_mutex);
3503         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3504                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3505
3506         if (updated_uuids)
3507                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3508
3509         return 0;
3510 }
3511
3512 /**
3513  * convert_state() - Converts the peer's view of the cluster state to our point of view
3514  * @ps:         The state as seen by the peer.
3515  */
3516 static union drbd_state convert_state(union drbd_state ps)
3517 {
3518         union drbd_state ms;
3519
3520         static enum drbd_conns c_tab[] = {
3521                 [C_CONNECTED] = C_CONNECTED,
3522
3523                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3524                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3525                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3526                 [C_VERIFY_S]       = C_VERIFY_T,
3527                 [C_MASK]   = C_MASK,
3528         };
3529
3530         ms.i = ps.i;
3531
3532         ms.conn = c_tab[ps.conn];
3533         ms.peer = ps.role;
3534         ms.role = ps.peer;
3535         ms.pdsk = ps.disk;
3536         ms.disk = ps.pdsk;
3537         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3538
3539         return ms;
3540 }
3541
3542 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3543 {
3544         struct drbd_conf *mdev;
3545         struct p_req_state *p = pi->data;
3546         union drbd_state mask, val;
3547         enum drbd_state_rv rv;
3548
3549         mdev = vnr_to_mdev(tconn, pi->vnr);
3550         if (!mdev)
3551                 return -EIO;
3552
3553         mask.i = be32_to_cpu(p->mask);
3554         val.i = be32_to_cpu(p->val);
3555
3556         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3557             mutex_is_locked(mdev->state_mutex)) {
3558                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3559                 return 0;
3560         }
3561
3562         mask = convert_state(mask);
3563         val = convert_state(val);
3564
3565         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3566         drbd_send_sr_reply(mdev, rv);
3567
3568         drbd_md_sync(mdev);
3569
3570         return 0;
3571 }
3572
3573 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3574 {
3575         struct p_req_state *p = pi->data;
3576         union drbd_state mask, val;
3577         enum drbd_state_rv rv;
3578
3579         mask.i = be32_to_cpu(p->mask);
3580         val.i = be32_to_cpu(p->val);
3581
3582         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3583             mutex_is_locked(&tconn->cstate_mutex)) {
3584                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3585                 return 0;
3586         }
3587
3588         mask = convert_state(mask);
3589         val = convert_state(val);
3590
3591         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3592         conn_send_sr_reply(tconn, rv);
3593
3594         return 0;
3595 }
3596
3597 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3598 {
3599         struct drbd_conf *mdev;
3600         struct p_state *p = pi->data;
3601         union drbd_state os, ns, peer_state;
3602         enum drbd_disk_state real_peer_disk;
3603         enum chg_state_flags cs_flags;
3604         int rv;
3605
3606         mdev = vnr_to_mdev(tconn, pi->vnr);
3607         if (!mdev)
3608                 return config_unknown_volume(tconn, pi);
3609
3610         peer_state.i = be32_to_cpu(p->state);
3611
3612         real_peer_disk = peer_state.disk;
3613         if (peer_state.disk == D_NEGOTIATING) {
3614                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3615                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3616         }
3617
3618         spin_lock_irq(&mdev->tconn->req_lock);
3619  retry:
3620         os = ns = drbd_read_state(mdev);
3621         spin_unlock_irq(&mdev->tconn->req_lock);
3622
3623         /* peer says his disk is uptodate, while we think it is inconsistent,
3624          * and this happens while we think we have a sync going on. */
3625         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3626             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3627                 /* If we are (becoming) SyncSource, but peer is still in sync
3628                  * preparation, ignore its uptodate-ness to avoid flapping, it
3629                  * will change to inconsistent once the peer reaches active
3630                  * syncing states.
3631                  * It may have changed syncer-paused flags, however, so we
3632                  * cannot ignore this completely. */
3633                 if (peer_state.conn > C_CONNECTED &&
3634                     peer_state.conn < C_SYNC_SOURCE)
3635                         real_peer_disk = D_INCONSISTENT;
3636
3637                 /* if peer_state changes to connected at the same time,
3638                  * it explicitly notifies us that it finished resync.
3639                  * Maybe we should finish it up, too? */
3640                 else if (os.conn >= C_SYNC_SOURCE &&
3641                          peer_state.conn == C_CONNECTED) {
3642                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3643                                 drbd_resync_finished(mdev);
3644                         return 0;
3645                 }
3646         }
3647
3648         /* peer says his disk is inconsistent, while we think it is uptodate,
3649          * and this happens while the peer still thinks we have a sync going on,
3650          * but we think we are already done with the sync.
3651          * We ignore this to avoid flapping pdsk.
3652          * This should not happen, if the peer is a recent version of drbd. */
3653         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3654             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3655                 real_peer_disk = D_UP_TO_DATE;
3656
3657         if (ns.conn == C_WF_REPORT_PARAMS)
3658                 ns.conn = C_CONNECTED;
3659
3660         if (peer_state.conn == C_AHEAD)
3661                 ns.conn = C_BEHIND;
3662
3663         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3664             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3665                 int cr; /* consider resync */
3666
3667                 /* if we established a new connection */
3668                 cr  = (os.conn < C_CONNECTED);
3669                 /* if we had an established connection
3670                  * and one of the nodes newly attaches a disk */
3671                 cr |= (os.conn == C_CONNECTED &&
3672                        (peer_state.disk == D_NEGOTIATING ||
3673                         os.disk == D_NEGOTIATING));
3674                 /* if we have both been inconsistent, and the peer has been
3675                  * forced to be UpToDate with --overwrite-data */
3676                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3677                 /* if we had been plain connected, and the admin requested to
3678                  * start a sync by "invalidate" or "invalidate-remote" */
3679                 cr |= (os.conn == C_CONNECTED &&
3680                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3681                                  peer_state.conn <= C_WF_BITMAP_T));
3682
3683                 if (cr)
3684                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3685
3686                 put_ldev(mdev);
3687                 if (ns.conn == C_MASK) {
3688                         ns.conn = C_CONNECTED;
3689                         if (mdev->state.disk == D_NEGOTIATING) {
3690                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3691                         } else if (peer_state.disk == D_NEGOTIATING) {
3692                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3693                                 peer_state.disk = D_DISKLESS;
3694                                 real_peer_disk = D_DISKLESS;
3695                         } else {
3696                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3697                                         return -EIO;
3698                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3699                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3700                                 return -EIO;
3701                         }
3702                 }
3703         }
3704
3705         spin_lock_irq(&mdev->tconn->req_lock);
3706         if (os.i != drbd_read_state(mdev).i)
3707                 goto retry;
3708         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3709         ns.peer = peer_state.role;
3710         ns.pdsk = real_peer_disk;
3711         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3712         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3713                 ns.disk = mdev->new_state_tmp.disk;
3714         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3715         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3716             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3717                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3718                    for temporal network outages! */
3719                 spin_unlock_irq(&mdev->tconn->req_lock);
3720                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3721                 tl_clear(mdev->tconn);
3722                 drbd_uuid_new_current(mdev);
3723                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3724                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3725                 return -EIO;
3726         }
3727         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3728         ns = drbd_read_state(mdev);
3729         spin_unlock_irq(&mdev->tconn->req_lock);
3730
3731         if (rv < SS_SUCCESS) {
3732                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3733                 return -EIO;
3734         }
3735
3736         if (os.conn > C_WF_REPORT_PARAMS) {
3737                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3738                     peer_state.disk != D_NEGOTIATING ) {
3739                         /* we want resync, peer has not yet decided to sync... */
3740                         /* Nowadays only used when forcing a node into primary role and
3741                            setting its disk to UpToDate with that */
3742                         drbd_send_uuids(mdev);
3743                         drbd_send_state(mdev);
3744                 }
3745         }
3746
3747         mutex_lock(&mdev->tconn->net_conf_update);
3748         mdev->tconn->net_conf->want_lose = 0; /* without copy; single bit op is atomic */
3749         mutex_unlock(&mdev->tconn->net_conf_update);
3750
3751         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3752
3753         return 0;
3754 }
3755
3756 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3757 {
3758         struct drbd_conf *mdev;
3759         struct p_rs_uuid *p = pi->data;
3760
3761         mdev = vnr_to_mdev(tconn, pi->vnr);
3762         if (!mdev)
3763                 return -EIO;
3764
3765         wait_event(mdev->misc_wait,
3766                    mdev->state.conn == C_WF_SYNC_UUID ||
3767                    mdev->state.conn == C_BEHIND ||
3768                    mdev->state.conn < C_CONNECTED ||
3769                    mdev->state.disk < D_NEGOTIATING);
3770
3771         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3772
3773         /* Here the _drbd_uuid_ functions are right, current should
3774            _not_ be rotated into the history */
3775         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3776                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3777                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3778
3779                 drbd_print_uuids(mdev, "updated sync uuid");
3780                 drbd_start_resync(mdev, C_SYNC_TARGET);
3781
3782                 put_ldev(mdev);
3783         } else
3784                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3785
3786         return 0;
3787 }
3788
3789 /**
3790  * receive_bitmap_plain
3791  *
3792  * Return 0 when done, 1 when another iteration is needed, and a negative error
3793  * code upon failure.
3794  */
3795 static int
3796 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3797                      unsigned long *p, struct bm_xfer_ctx *c)
3798 {
3799         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3800                                  drbd_header_size(mdev->tconn);
3801         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3802                                        c->bm_words - c->word_offset);
3803         unsigned int want = num_words * sizeof(*p);
3804         int err;
3805
3806         if (want != size) {
3807                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3808                 return -EIO;
3809         }
3810         if (want == 0)
3811                 return 0;
3812         err = drbd_recv_all(mdev->tconn, p, want);
3813         if (err)
3814                 return err;
3815
3816         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3817
3818         c->word_offset += num_words;
3819         c->bit_offset = c->word_offset * BITS_PER_LONG;
3820         if (c->bit_offset > c->bm_bits)
3821                 c->bit_offset = c->bm_bits;
3822
3823         return 1;
3824 }
3825
3826 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3827 {
3828         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3829 }
3830
3831 static int dcbp_get_start(struct p_compressed_bm *p)
3832 {
3833         return (p->encoding & 0x80) != 0;
3834 }
3835
3836 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3837 {
3838         return (p->encoding >> 4) & 0x7;
3839 }
3840
3841 /**
3842  * recv_bm_rle_bits
3843  *
3844  * Return 0 when done, 1 when another iteration is needed, and a negative error
3845  * code upon failure.
3846  */
3847 static int
3848 recv_bm_rle_bits(struct drbd_conf *mdev,
3849                 struct p_compressed_bm *p,
3850                  struct bm_xfer_ctx *c,
3851                  unsigned int len)
3852 {
3853         struct bitstream bs;
3854         u64 look_ahead;
3855         u64 rl;
3856         u64 tmp;
3857         unsigned long s = c->bit_offset;
3858         unsigned long e;
3859         int toggle = dcbp_get_start(p);
3860         int have;
3861         int bits;
3862
3863         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3864
3865         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3866         if (bits < 0)
3867                 return -EIO;
3868
3869         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3870                 bits = vli_decode_bits(&rl, look_ahead);
3871                 if (bits <= 0)
3872                         return -EIO;
3873
3874                 if (toggle) {
3875                         e = s + rl -1;
3876                         if (e >= c->bm_bits) {
3877                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3878                                 return -EIO;
3879                         }
3880                         _drbd_bm_set_bits(mdev, s, e);
3881                 }
3882
3883                 if (have < bits) {
3884                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3885                                 have, bits, look_ahead,
3886                                 (unsigned int)(bs.cur.b - p->code),
3887                                 (unsigned int)bs.buf_len);
3888                         return -EIO;
3889                 }
3890                 look_ahead >>= bits;
3891                 have -= bits;
3892
3893                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3894                 if (bits < 0)
3895                         return -EIO;
3896                 look_ahead |= tmp << have;
3897                 have += bits;
3898         }
3899
3900         c->bit_offset = s;
3901         bm_xfer_ctx_bit_to_word_offset(c);
3902
3903         return (s != c->bm_bits);
3904 }
3905
3906 /**
3907  * decode_bitmap_c
3908  *
3909  * Return 0 when done, 1 when another iteration is needed, and a negative error
3910  * code upon failure.
3911  */
3912 static int
3913 decode_bitmap_c(struct drbd_conf *mdev,
3914                 struct p_compressed_bm *p,
3915                 struct bm_xfer_ctx *c,
3916                 unsigned int len)
3917 {
3918         if (dcbp_get_code(p) == RLE_VLI_Bits)
3919                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3920
3921         /* other variants had been implemented for evaluation,
3922          * but have been dropped as this one turned out to be "best"
3923          * during all our tests. */
3924
3925         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3926         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3927         return -EIO;
3928 }
3929
3930 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3931                 const char *direction, struct bm_xfer_ctx *c)
3932 {
3933         /* what would it take to transfer it "plaintext" */
3934         unsigned int header_size = drbd_header_size(mdev->tconn);
3935         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3936         unsigned int plain =
3937                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3938                 c->bm_words * sizeof(unsigned long);
3939         unsigned int total = c->bytes[0] + c->bytes[1];
3940         unsigned int r;
3941
3942         /* total can not be zero. but just in case: */
3943         if (total == 0)
3944                 return;
3945
3946         /* don't report if not compressed */
3947         if (total >= plain)
3948                 return;
3949
3950         /* total < plain. check for overflow, still */
3951         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3952                                     : (1000 * total / plain);
3953
3954         if (r > 1000)
3955                 r = 1000;
3956
3957         r = 1000 - r;
3958         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3959              "total %u; compression: %u.%u%%\n",
3960                         direction,
3961                         c->bytes[1], c->packets[1],
3962                         c->bytes[0], c->packets[0],
3963                         total, r/10, r % 10);
3964 }
3965
3966 /* Since we are processing the bitfield from lower addresses to higher,
3967    it does not matter if the process it in 32 bit chunks or 64 bit
3968    chunks as long as it is little endian. (Understand it as byte stream,
3969    beginning with the lowest byte...) If we would use big endian
3970    we would need to process it from the highest address to the lowest,
3971    in order to be agnostic to the 32 vs 64 bits issue.
3972
3973    returns 0 on failure, 1 if we successfully received it. */
3974 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3975 {
3976         struct drbd_conf *mdev;
3977         struct bm_xfer_ctx c;
3978         int err;
3979
3980         mdev = vnr_to_mdev(tconn, pi->vnr);
3981         if (!mdev)
3982                 return -EIO;
3983
3984         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3985         /* you are supposed to send additional out-of-sync information
3986          * if you actually set bits during this phase */
3987
3988         c = (struct bm_xfer_ctx) {
3989                 .bm_bits = drbd_bm_bits(mdev),
3990                 .bm_words = drbd_bm_words(mdev),
3991         };
3992
3993         for(;;) {
3994                 if (pi->cmd == P_BITMAP)
3995                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3996                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3997                         /* MAYBE: sanity check that we speak proto >= 90,
3998                          * and the feature is enabled! */
3999                         struct p_compressed_bm *p = pi->data;
4000
4001                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4002                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4003                                 err = -EIO;
4004                                 goto out;
4005                         }
4006                         if (pi->size <= sizeof(*p)) {
4007                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4008                                 err = -EIO;
4009                                 goto out;
4010                         }
4011                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4012                         if (err)
4013                                goto out;
4014                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4015                 } else {
4016                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4017                         err = -EIO;
4018                         goto out;
4019                 }
4020
4021                 c.packets[pi->cmd == P_BITMAP]++;
4022                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4023
4024                 if (err <= 0) {
4025                         if (err < 0)
4026                                 goto out;
4027                         break;
4028                 }
4029                 err = drbd_recv_header(mdev->tconn, pi);
4030                 if (err)
4031                         goto out;
4032         }
4033
4034         INFO_bm_xfer_stats(mdev, "receive", &c);
4035
4036         if (mdev->state.conn == C_WF_BITMAP_T) {
4037                 enum drbd_state_rv rv;
4038
4039                 err = drbd_send_bitmap(mdev);
4040                 if (err)
4041                         goto out;
4042                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4043                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4044                 D_ASSERT(rv == SS_SUCCESS);
4045         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4046                 /* admin may have requested C_DISCONNECTING,
4047                  * other threads may have noticed network errors */
4048                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4049                     drbd_conn_str(mdev->state.conn));
4050         }
4051         err = 0;
4052
4053  out:
4054         drbd_bm_unlock(mdev);
4055         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4056                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4057         return err;
4058 }
4059
4060 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4061 {
4062         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4063                  pi->cmd, pi->size);
4064
4065         return ignore_remaining_packet(tconn, pi);
4066 }
4067
4068 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4069 {
4070         /* Make sure we've acked all the TCP data associated
4071          * with the data requests being unplugged */
4072         drbd_tcp_quickack(tconn->data.socket);
4073
4074         return 0;
4075 }
4076
4077 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4078 {
4079         struct drbd_conf *mdev;
4080         struct p_block_desc *p = pi->data;
4081
4082         mdev = vnr_to_mdev(tconn, pi->vnr);
4083         if (!mdev)
4084                 return -EIO;
4085
4086         switch (mdev->state.conn) {
4087         case C_WF_SYNC_UUID:
4088         case C_WF_BITMAP_T:
4089         case C_BEHIND:
4090                         break;
4091         default:
4092                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4093                                 drbd_conn_str(mdev->state.conn));
4094         }
4095
4096         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4097
4098         return 0;
4099 }
4100
4101 struct data_cmd {
4102         int expect_payload;
4103         size_t pkt_size;
4104         int (*fn)(struct drbd_tconn *, struct packet_info *);
4105 };
4106
4107 static struct data_cmd drbd_cmd_handler[] = {
4108         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4109         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4110         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4111         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4112         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4113         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4114         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4115         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4116         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4117         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4118         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4119         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4120         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4121         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4122         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4123         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4124         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4125         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4126         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4127         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4128         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4129         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4130         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4131 };
4132
4133 static void drbdd(struct drbd_tconn *tconn)
4134 {
4135         struct packet_info pi;
4136         size_t shs; /* sub header size */
4137         int err;
4138
4139         while (get_t_state(&tconn->receiver) == RUNNING) {
4140                 struct data_cmd *cmd;
4141
4142                 drbd_thread_current_set_cpu(&tconn->receiver);
4143                 if (drbd_recv_header(tconn, &pi))
4144                         goto err_out;
4145
4146                 cmd = &drbd_cmd_handler[pi.cmd];
4147                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4148                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4149                         goto err_out;
4150                 }
4151
4152                 shs = cmd->pkt_size;
4153                 if (pi.size > shs && !cmd->expect_payload) {
4154                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4155                         goto err_out;
4156                 }
4157
4158                 if (shs) {
4159                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4160                         if (err)
4161                                 goto err_out;
4162                         pi.size -= shs;
4163                 }
4164
4165                 err = cmd->fn(tconn, &pi);
4166                 if (err) {
4167                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4168                                  cmdname(pi.cmd), err, pi.size);
4169                         goto err_out;
4170                 }
4171         }
4172         return;
4173
4174     err_out:
4175         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4176 }
4177
4178 void conn_flush_workqueue(struct drbd_tconn *tconn)
4179 {
4180         struct drbd_wq_barrier barr;
4181
4182         barr.w.cb = w_prev_work_done;
4183         barr.w.tconn = tconn;
4184         init_completion(&barr.done);
4185         drbd_queue_work(&tconn->data.work, &barr.w);
4186         wait_for_completion(&barr.done);
4187 }
4188
4189 static void drbd_disconnect(struct drbd_tconn *tconn)
4190 {
4191         enum drbd_conns oc;
4192         int rv = SS_UNKNOWN_ERROR;
4193
4194         if (tconn->cstate == C_STANDALONE)
4195                 return;
4196
4197         /* asender does not clean up anything. it must not interfere, either */
4198         drbd_thread_stop(&tconn->asender);
4199         drbd_free_sock(tconn);
4200
4201         down_read(&drbd_cfg_rwsem);
4202         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4203         up_read(&drbd_cfg_rwsem);
4204         conn_info(tconn, "Connection closed\n");
4205
4206         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4207                 conn_try_outdate_peer_async(tconn);
4208
4209         spin_lock_irq(&tconn->req_lock);
4210         oc = tconn->cstate;
4211         if (oc >= C_UNCONNECTED)
4212                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4213
4214         spin_unlock_irq(&tconn->req_lock);
4215
4216         if (oc == C_DISCONNECTING)
4217                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4218 }
4219
4220 static int drbd_disconnected(int vnr, void *p, void *data)
4221 {
4222         struct drbd_conf *mdev = (struct drbd_conf *)p;
4223         enum drbd_fencing_p fp;
4224         unsigned int i;
4225
4226         /* wait for current activity to cease. */
4227         spin_lock_irq(&mdev->tconn->req_lock);
4228         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4229         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4230         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4231         spin_unlock_irq(&mdev->tconn->req_lock);
4232
4233         /* We do not have data structures that would allow us to
4234          * get the rs_pending_cnt down to 0 again.
4235          *  * On C_SYNC_TARGET we do not have any data structures describing
4236          *    the pending RSDataRequest's we have sent.
4237          *  * On C_SYNC_SOURCE there is no data structure that tracks
4238          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4239          *  And no, it is not the sum of the reference counts in the
4240          *  resync_LRU. The resync_LRU tracks the whole operation including
4241          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4242          *  on the fly. */
4243         drbd_rs_cancel_all(mdev);
4244         mdev->rs_total = 0;
4245         mdev->rs_failed = 0;
4246         atomic_set(&mdev->rs_pending_cnt, 0);
4247         wake_up(&mdev->misc_wait);
4248
4249         del_timer(&mdev->request_timer);
4250
4251         del_timer_sync(&mdev->resync_timer);
4252         resync_timer_fn((unsigned long)mdev);
4253
4254         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4255          * w_make_resync_request etc. which may still be on the worker queue
4256          * to be "canceled" */
4257         drbd_flush_workqueue(mdev);
4258
4259         drbd_finish_peer_reqs(mdev);
4260
4261         kfree(mdev->p_uuid);
4262         mdev->p_uuid = NULL;
4263
4264         if (!drbd_suspended(mdev))
4265                 tl_clear(mdev->tconn);
4266
4267         drbd_md_sync(mdev);
4268
4269         fp = FP_DONT_CARE;
4270         if (get_ldev(mdev)) {
4271                 fp = mdev->ldev->dc.fencing;
4272                 put_ldev(mdev);
4273         }
4274
4275         /* serialize with bitmap writeout triggered by the state change,
4276          * if any. */
4277         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4278
4279         /* tcp_close and release of sendpage pages can be deferred.  I don't
4280          * want to use SO_LINGER, because apparently it can be deferred for
4281          * more than 20 seconds (longest time I checked).
4282          *
4283          * Actually we don't care for exactly when the network stack does its
4284          * put_page(), but release our reference on these pages right here.
4285          */
4286         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4287         if (i)
4288                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4289         i = atomic_read(&mdev->pp_in_use_by_net);
4290         if (i)
4291                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4292         i = atomic_read(&mdev->pp_in_use);
4293         if (i)
4294                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4295
4296         D_ASSERT(list_empty(&mdev->read_ee));
4297         D_ASSERT(list_empty(&mdev->active_ee));
4298         D_ASSERT(list_empty(&mdev->sync_ee));
4299         D_ASSERT(list_empty(&mdev->done_ee));
4300
4301         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4302         atomic_set(&mdev->current_epoch->epoch_size, 0);
4303         D_ASSERT(list_empty(&mdev->current_epoch->list));
4304
4305         return 0;
4306 }
4307
4308 /*
4309  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4310  * we can agree on is stored in agreed_pro_version.
4311  *
4312  * feature flags and the reserved array should be enough room for future
4313  * enhancements of the handshake protocol, and possible plugins...
4314  *
4315  * for now, they are expected to be zero, but ignored.
4316  */
4317 static int drbd_send_features(struct drbd_tconn *tconn)
4318 {
4319         struct drbd_socket *sock;
4320         struct p_connection_features *p;
4321
4322         sock = &tconn->data;
4323         p = conn_prepare_command(tconn, sock);
4324         if (!p)
4325                 return -EIO;
4326         memset(p, 0, sizeof(*p));
4327         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4328         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4329         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4330 }
4331
4332 /*
4333  * return values:
4334  *   1 yes, we have a valid connection
4335  *   0 oops, did not work out, please try again
4336  *  -1 peer talks different language,
4337  *     no point in trying again, please go standalone.
4338  */
4339 static int drbd_do_features(struct drbd_tconn *tconn)
4340 {
4341         /* ASSERT current == tconn->receiver ... */
4342         struct p_connection_features *p;
4343         const int expect = sizeof(struct p_connection_features);
4344         struct packet_info pi;
4345         int err;
4346
4347         err = drbd_send_features(tconn);
4348         if (err)
4349                 return 0;
4350
4351         err = drbd_recv_header(tconn, &pi);
4352         if (err)
4353                 return 0;
4354
4355         if (pi.cmd != P_CONNECTION_FEATURES) {
4356                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4357                      cmdname(pi.cmd), pi.cmd);
4358                 return -1;
4359         }
4360
4361         if (pi.size != expect) {
4362                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4363                      expect, pi.size);
4364                 return -1;
4365         }
4366
4367         p = pi.data;
4368         err = drbd_recv_all_warn(tconn, p, expect);
4369         if (err)
4370                 return 0;
4371
4372         p->protocol_min = be32_to_cpu(p->protocol_min);
4373         p->protocol_max = be32_to_cpu(p->protocol_max);
4374         if (p->protocol_max == 0)
4375                 p->protocol_max = p->protocol_min;
4376
4377         if (PRO_VERSION_MAX < p->protocol_min ||
4378             PRO_VERSION_MIN > p->protocol_max)
4379                 goto incompat;
4380
4381         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4382
4383         conn_info(tconn, "Handshake successful: "
4384              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4385
4386         return 1;
4387
4388  incompat:
4389         conn_err(tconn, "incompatible DRBD dialects: "
4390             "I support %d-%d, peer supports %d-%d\n",
4391             PRO_VERSION_MIN, PRO_VERSION_MAX,
4392             p->protocol_min, p->protocol_max);
4393         return -1;
4394 }
4395
4396 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4397 static int drbd_do_auth(struct drbd_tconn *tconn)
4398 {
4399         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4400         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4401         return -1;
4402 }
4403 #else
4404 #define CHALLENGE_LEN 64
4405
4406 /* Return value:
4407         1 - auth succeeded,
4408         0 - failed, try again (network error),
4409         -1 - auth failed, don't try again.
4410 */
4411
4412 static int drbd_do_auth(struct drbd_tconn *tconn)
4413 {
4414         struct drbd_socket *sock;
4415         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4416         struct scatterlist sg;
4417         char *response = NULL;
4418         char *right_response = NULL;
4419         char *peers_ch = NULL;
4420         unsigned int key_len;
4421         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4422         unsigned int resp_size;
4423         struct hash_desc desc;
4424         struct packet_info pi;
4425         struct net_conf *nc;
4426         int err, rv;
4427
4428         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4429
4430         rcu_read_lock();
4431         nc = rcu_dereference(tconn->net_conf);
4432         key_len = strlen(nc->shared_secret);
4433         memcpy(secret, nc->shared_secret, key_len);
4434         rcu_read_unlock();
4435
4436         desc.tfm = tconn->cram_hmac_tfm;
4437         desc.flags = 0;
4438
4439         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4440         if (rv) {
4441                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4442                 rv = -1;
4443                 goto fail;
4444         }
4445
4446         get_random_bytes(my_challenge, CHALLENGE_LEN);
4447
4448         sock = &tconn->data;
4449         if (!conn_prepare_command(tconn, sock)) {
4450                 rv = 0;
4451                 goto fail;
4452         }
4453         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4454                                 my_challenge, CHALLENGE_LEN);
4455         if (!rv)
4456                 goto fail;
4457
4458         err = drbd_recv_header(tconn, &pi);
4459         if (err) {
4460                 rv = 0;
4461                 goto fail;
4462         }
4463
4464         if (pi.cmd != P_AUTH_CHALLENGE) {
4465                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4466                     cmdname(pi.cmd), pi.cmd);
4467                 rv = 0;
4468                 goto fail;
4469         }
4470
4471         if (pi.size > CHALLENGE_LEN * 2) {
4472                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4473                 rv = -1;
4474                 goto fail;
4475         }
4476
4477         peers_ch = kmalloc(pi.size, GFP_NOIO);
4478         if (peers_ch == NULL) {
4479                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4480                 rv = -1;
4481                 goto fail;
4482         }
4483
4484         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4485         if (err) {
4486                 rv = 0;
4487                 goto fail;
4488         }
4489
4490         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4491         response = kmalloc(resp_size, GFP_NOIO);
4492         if (response == NULL) {
4493                 conn_err(tconn, "kmalloc of response failed\n");
4494                 rv = -1;
4495                 goto fail;
4496         }
4497
4498         sg_init_table(&sg, 1);
4499         sg_set_buf(&sg, peers_ch, pi.size);
4500
4501         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4502         if (rv) {
4503                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4504                 rv = -1;
4505                 goto fail;
4506         }
4507
4508         if (!conn_prepare_command(tconn, sock)) {
4509                 rv = 0;
4510                 goto fail;
4511         }
4512         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4513                                 response, resp_size);
4514         if (!rv)
4515                 goto fail;
4516
4517         err = drbd_recv_header(tconn, &pi);
4518         if (err) {
4519                 rv = 0;
4520                 goto fail;
4521         }
4522
4523         if (pi.cmd != P_AUTH_RESPONSE) {
4524                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4525                         cmdname(pi.cmd), pi.cmd);
4526                 rv = 0;
4527                 goto fail;
4528         }
4529
4530         if (pi.size != resp_size) {
4531                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4532                 rv = 0;
4533                 goto fail;
4534         }
4535
4536         err = drbd_recv_all_warn(tconn, response , resp_size);
4537         if (err) {
4538                 rv = 0;
4539                 goto fail;
4540         }
4541
4542         right_response = kmalloc(resp_size, GFP_NOIO);
4543         if (right_response == NULL) {
4544                 conn_err(tconn, "kmalloc of right_response failed\n");
4545                 rv = -1;
4546                 goto fail;
4547         }
4548
4549         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4550
4551         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4552         if (rv) {
4553                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4554                 rv = -1;
4555                 goto fail;
4556         }
4557
4558         rv = !memcmp(response, right_response, resp_size);
4559
4560         if (rv)
4561                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4562                      resp_size);
4563         else
4564                 rv = -1;
4565
4566  fail:
4567         kfree(peers_ch);
4568         kfree(response);
4569         kfree(right_response);
4570
4571         return rv;
4572 }
4573 #endif
4574
4575 int drbdd_init(struct drbd_thread *thi)
4576 {
4577         struct drbd_tconn *tconn = thi->tconn;
4578         int h;
4579
4580         conn_info(tconn, "receiver (re)started\n");
4581
4582         do {
4583                 h = drbd_connect(tconn);
4584                 if (h == 0) {
4585                         drbd_disconnect(tconn);
4586                         schedule_timeout_interruptible(HZ);
4587                 }
4588                 if (h == -1) {
4589                         conn_warn(tconn, "Discarding network configuration.\n");
4590                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4591                 }
4592         } while (h == 0);
4593
4594         if (h > 0)
4595                 drbdd(tconn);
4596
4597         drbd_disconnect(tconn);
4598
4599         conn_info(tconn, "receiver terminated\n");
4600         return 0;
4601 }
4602
4603 /* ********* acknowledge sender ******** */
4604
4605 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4606 {
4607         struct p_req_state_reply *p = pi->data;
4608         int retcode = be32_to_cpu(p->retcode);
4609
4610         if (retcode >= SS_SUCCESS) {
4611                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4612         } else {
4613                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4614                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4615                          drbd_set_st_err_str(retcode), retcode);
4616         }
4617         wake_up(&tconn->ping_wait);
4618
4619         return 0;
4620 }
4621
4622 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4623 {
4624         struct drbd_conf *mdev;
4625         struct p_req_state_reply *p = pi->data;
4626         int retcode = be32_to_cpu(p->retcode);
4627
4628         mdev = vnr_to_mdev(tconn, pi->vnr);
4629         if (!mdev)
4630                 return -EIO;
4631
4632         if (retcode >= SS_SUCCESS) {
4633                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4634         } else {
4635                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4636                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4637                         drbd_set_st_err_str(retcode), retcode);
4638         }
4639         wake_up(&mdev->state_wait);
4640
4641         return 0;
4642 }
4643
4644 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4645 {
4646         return drbd_send_ping_ack(tconn);
4647
4648 }
4649
4650 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4651 {
4652         /* restore idle timeout */
4653         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4654         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4655                 wake_up(&tconn->ping_wait);
4656
4657         return 0;
4658 }
4659
4660 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4661 {
4662         struct drbd_conf *mdev;
4663         struct p_block_ack *p = pi->data;
4664         sector_t sector = be64_to_cpu(p->sector);
4665         int blksize = be32_to_cpu(p->blksize);
4666
4667         mdev = vnr_to_mdev(tconn, pi->vnr);
4668         if (!mdev)
4669                 return -EIO;
4670
4671         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4672
4673         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4674
4675         if (get_ldev(mdev)) {
4676                 drbd_rs_complete_io(mdev, sector);
4677                 drbd_set_in_sync(mdev, sector, blksize);
4678                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4679                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4680                 put_ldev(mdev);
4681         }
4682         dec_rs_pending(mdev);
4683         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4684
4685         return 0;
4686 }
4687
4688 static int
4689 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4690                               struct rb_root *root, const char *func,
4691                               enum drbd_req_event what, bool missing_ok)
4692 {
4693         struct drbd_request *req;
4694         struct bio_and_error m;
4695
4696         spin_lock_irq(&mdev->tconn->req_lock);
4697         req = find_request(mdev, root, id, sector, missing_ok, func);
4698         if (unlikely(!req)) {
4699                 spin_unlock_irq(&mdev->tconn->req_lock);
4700                 return -EIO;
4701         }
4702         __req_mod(req, what, &m);
4703         spin_unlock_irq(&mdev->tconn->req_lock);
4704
4705         if (m.bio)
4706                 complete_master_bio(mdev, &m);
4707         return 0;
4708 }
4709
4710 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4711 {
4712         struct drbd_conf *mdev;
4713         struct p_block_ack *p = pi->data;
4714         sector_t sector = be64_to_cpu(p->sector);
4715         int blksize = be32_to_cpu(p->blksize);
4716         enum drbd_req_event what;
4717
4718         mdev = vnr_to_mdev(tconn, pi->vnr);
4719         if (!mdev)
4720                 return -EIO;
4721
4722         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4723
4724         if (p->block_id == ID_SYNCER) {
4725                 drbd_set_in_sync(mdev, sector, blksize);
4726                 dec_rs_pending(mdev);
4727                 return 0;
4728         }
4729         switch (pi->cmd) {
4730         case P_RS_WRITE_ACK:
4731                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4732                 break;
4733         case P_WRITE_ACK:
4734                 what = WRITE_ACKED_BY_PEER;
4735                 break;
4736         case P_RECV_ACK:
4737                 what = RECV_ACKED_BY_PEER;
4738                 break;
4739         case P_DISCARD_WRITE:
4740                 what = DISCARD_WRITE;
4741                 break;
4742         case P_RETRY_WRITE:
4743                 what = POSTPONE_WRITE;
4744                 break;
4745         default:
4746                 BUG();
4747         }
4748
4749         return validate_req_change_req_state(mdev, p->block_id, sector,
4750                                              &mdev->write_requests, __func__,
4751                                              what, false);
4752 }
4753
4754 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4755 {
4756         struct drbd_conf *mdev;
4757         struct p_block_ack *p = pi->data;
4758         sector_t sector = be64_to_cpu(p->sector);
4759         int size = be32_to_cpu(p->blksize);
4760         int err;
4761
4762         mdev = vnr_to_mdev(tconn, pi->vnr);
4763         if (!mdev)
4764                 return -EIO;
4765
4766         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4767
4768         if (p->block_id == ID_SYNCER) {
4769                 dec_rs_pending(mdev);
4770                 drbd_rs_failed_io(mdev, sector, size);
4771                 return 0;
4772         }
4773
4774         err = validate_req_change_req_state(mdev, p->block_id, sector,
4775                                             &mdev->write_requests, __func__,
4776                                             NEG_ACKED, true);
4777         if (err) {
4778                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4779                    The master bio might already be completed, therefore the
4780                    request is no longer in the collision hash. */
4781                 /* In Protocol B we might already have got a P_RECV_ACK
4782                    but then get a P_NEG_ACK afterwards. */
4783                 drbd_set_out_of_sync(mdev, sector, size);
4784         }
4785         return 0;
4786 }
4787
4788 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4789 {
4790         struct drbd_conf *mdev;
4791         struct p_block_ack *p = pi->data;
4792         sector_t sector = be64_to_cpu(p->sector);
4793
4794         mdev = vnr_to_mdev(tconn, pi->vnr);
4795         if (!mdev)
4796                 return -EIO;
4797
4798         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4799
4800         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4801             (unsigned long long)sector, be32_to_cpu(p->blksize));
4802
4803         return validate_req_change_req_state(mdev, p->block_id, sector,
4804                                              &mdev->read_requests, __func__,
4805                                              NEG_ACKED, false);
4806 }
4807
4808 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4809 {
4810         struct drbd_conf *mdev;
4811         sector_t sector;
4812         int size;
4813         struct p_block_ack *p = pi->data;
4814
4815         mdev = vnr_to_mdev(tconn, pi->vnr);
4816         if (!mdev)
4817                 return -EIO;
4818
4819         sector = be64_to_cpu(p->sector);
4820         size = be32_to_cpu(p->blksize);
4821
4822         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4823
4824         dec_rs_pending(mdev);
4825
4826         if (get_ldev_if_state(mdev, D_FAILED)) {
4827                 drbd_rs_complete_io(mdev, sector);
4828                 switch (pi->cmd) {
4829                 case P_NEG_RS_DREPLY:
4830                         drbd_rs_failed_io(mdev, sector, size);
4831                 case P_RS_CANCEL:
4832                         break;
4833                 default:
4834                         BUG();
4835                 }
4836                 put_ldev(mdev);
4837         }
4838
4839         return 0;
4840 }
4841
4842 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4843 {
4844         struct drbd_conf *mdev;
4845         struct p_barrier_ack *p = pi->data;
4846
4847         mdev = vnr_to_mdev(tconn, pi->vnr);
4848         if (!mdev)
4849                 return -EIO;
4850
4851         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4852
4853         if (mdev->state.conn == C_AHEAD &&
4854             atomic_read(&mdev->ap_in_flight) == 0 &&
4855             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4856                 mdev->start_resync_timer.expires = jiffies + HZ;
4857                 add_timer(&mdev->start_resync_timer);
4858         }
4859
4860         return 0;
4861 }
4862
4863 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4864 {
4865         struct drbd_conf *mdev;
4866         struct p_block_ack *p = pi->data;
4867         struct drbd_work *w;
4868         sector_t sector;
4869         int size;
4870
4871         mdev = vnr_to_mdev(tconn, pi->vnr);
4872         if (!mdev)
4873                 return -EIO;
4874
4875         sector = be64_to_cpu(p->sector);
4876         size = be32_to_cpu(p->blksize);
4877
4878         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4879
4880         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4881                 drbd_ov_out_of_sync_found(mdev, sector, size);
4882         else
4883                 ov_out_of_sync_print(mdev);
4884
4885         if (!get_ldev(mdev))
4886                 return 0;
4887
4888         drbd_rs_complete_io(mdev, sector);
4889         dec_rs_pending(mdev);
4890
4891         --mdev->ov_left;
4892
4893         /* let's advance progress step marks only for every other megabyte */
4894         if ((mdev->ov_left & 0x200) == 0x200)
4895                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4896
4897         if (mdev->ov_left == 0) {
4898                 w = kmalloc(sizeof(*w), GFP_NOIO);
4899                 if (w) {
4900                         w->cb = w_ov_finished;
4901                         w->mdev = mdev;
4902                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4903                 } else {
4904                         dev_err(DEV, "kmalloc(w) failed.");
4905                         ov_out_of_sync_print(mdev);
4906                         drbd_resync_finished(mdev);
4907                 }
4908         }
4909         put_ldev(mdev);
4910         return 0;
4911 }
4912
4913 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4914 {
4915         return 0;
4916 }
4917
4918 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4919 {
4920         struct drbd_conf *mdev;
4921         int i, not_empty = 0;
4922
4923         do {
4924                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4925                 flush_signals(current);
4926                 down_read(&drbd_cfg_rwsem);
4927                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4928                         if (drbd_finish_peer_reqs(mdev)) {
4929                                 up_read(&drbd_cfg_rwsem);
4930                                 return 1; /* error */
4931                         }
4932                 }
4933                 up_read(&drbd_cfg_rwsem);
4934                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4935
4936                 spin_lock_irq(&tconn->req_lock);
4937                 rcu_read_lock();
4938                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4939                         not_empty = !list_empty(&mdev->done_ee);
4940                         if (not_empty)
4941                                 break;
4942                 }
4943                 rcu_read_unlock();
4944                 spin_unlock_irq(&tconn->req_lock);
4945         } while (not_empty);
4946
4947         return 0;
4948 }
4949
4950 struct asender_cmd {
4951         size_t pkt_size;
4952         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4953 };
4954
4955 static struct asender_cmd asender_tbl[] = {
4956         [P_PING]            = { 0, got_Ping },
4957         [P_PING_ACK]        = { 0, got_PingAck },
4958         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4959         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4960         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4961         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4962         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4963         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4964         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4965         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4966         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4967         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4968         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4969         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4970         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4971         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4972         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4973 };
4974
4975 int drbd_asender(struct drbd_thread *thi)
4976 {
4977         struct drbd_tconn *tconn = thi->tconn;
4978         struct asender_cmd *cmd = NULL;
4979         struct packet_info pi;
4980         int rv;
4981         void *buf    = tconn->meta.rbuf;
4982         int received = 0;
4983         unsigned int header_size = drbd_header_size(tconn);
4984         int expect   = header_size;
4985         bool ping_timeout_active = false;
4986         struct net_conf *nc;
4987         int ping_timeo, no_cork, ping_int;
4988
4989         current->policy = SCHED_RR;  /* Make this a realtime task! */
4990         current->rt_priority = 2;    /* more important than all other tasks */
4991
4992         while (get_t_state(thi) == RUNNING) {
4993                 drbd_thread_current_set_cpu(thi);
4994
4995                 rcu_read_lock();
4996                 nc = rcu_dereference(tconn->net_conf);
4997                 ping_timeo = nc->ping_timeo;
4998                 no_cork = nc->no_cork;
4999                 ping_int = nc->ping_int;
5000                 rcu_read_unlock();
5001
5002                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5003                         if (drbd_send_ping(tconn)) {
5004                                 conn_err(tconn, "drbd_send_ping has failed\n");
5005                                 goto reconnect;
5006                         }
5007                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5008                         ping_timeout_active = true;
5009                 }
5010
5011                 /* TODO: conditionally cork; it may hurt latency if we cork without
5012                    much to send */
5013                 if (!no_cork)
5014                         drbd_tcp_cork(tconn->meta.socket);
5015                 if (tconn_finish_peer_reqs(tconn)) {
5016                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5017                         goto reconnect;
5018                 }
5019                 /* but unconditionally uncork unless disabled */
5020                 if (!no_cork)
5021                         drbd_tcp_uncork(tconn->meta.socket);
5022
5023                 /* short circuit, recv_msg would return EINTR anyways. */
5024                 if (signal_pending(current))
5025                         continue;
5026
5027                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5028                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5029
5030                 flush_signals(current);
5031
5032                 /* Note:
5033                  * -EINTR        (on meta) we got a signal
5034                  * -EAGAIN       (on meta) rcvtimeo expired
5035                  * -ECONNRESET   other side closed the connection
5036                  * -ERESTARTSYS  (on data) we got a signal
5037                  * rv <  0       other than above: unexpected error!
5038                  * rv == expected: full header or command
5039                  * rv <  expected: "woken" by signal during receive
5040                  * rv == 0       : "connection shut down by peer"
5041                  */
5042                 if (likely(rv > 0)) {
5043                         received += rv;
5044                         buf      += rv;
5045                 } else if (rv == 0) {
5046                         conn_err(tconn, "meta connection shut down by peer.\n");
5047                         goto reconnect;
5048                 } else if (rv == -EAGAIN) {
5049                         /* If the data socket received something meanwhile,
5050                          * that is good enough: peer is still alive. */
5051                         if (time_after(tconn->last_received,
5052                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5053                                 continue;
5054                         if (ping_timeout_active) {
5055                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5056                                 goto reconnect;
5057                         }
5058                         set_bit(SEND_PING, &tconn->flags);
5059                         continue;
5060                 } else if (rv == -EINTR) {
5061                         continue;
5062                 } else {
5063                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5064                         goto reconnect;
5065                 }
5066
5067                 if (received == expect && cmd == NULL) {
5068                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5069                                 goto reconnect;
5070                         cmd = &asender_tbl[pi.cmd];
5071                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5072                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
5073                                         pi.cmd, pi.size);
5074                                 goto disconnect;
5075                         }
5076                         expect = header_size + cmd->pkt_size;
5077                         if (pi.size != expect - header_size) {
5078                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5079                                         pi.cmd, pi.size);
5080                                 goto reconnect;
5081                         }
5082                 }
5083                 if (received == expect) {
5084                         bool err;
5085
5086                         err = cmd->fn(tconn, &pi);
5087                         if (err) {
5088                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5089                                 goto reconnect;
5090                         }
5091
5092                         tconn->last_received = jiffies;
5093
5094                         if (cmd == &asender_tbl[P_PING_ACK]) {
5095                                 /* restore idle timeout */
5096                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5097                                 ping_timeout_active = false;
5098                         }
5099
5100                         buf      = tconn->meta.rbuf;
5101                         received = 0;
5102                         expect   = header_size;
5103                         cmd      = NULL;
5104                 }
5105         }
5106
5107         if (0) {
5108 reconnect:
5109                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5110         }
5111         if (0) {
5112 disconnect:
5113                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5114         }
5115         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5116
5117         conn_info(tconn, "asender terminated\n");
5118
5119         return 0;
5120 }