drbd: Fix the WO=drain implementation for multiple volumes
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629         sndbuf_size = nc->sndbuf_size;
630         rcvbuf_size = nc->rcvbuf_size;
631         connect_int = nc->connect_int;
632         rcu_read_unlock();
633
634         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
644
645         what = "sock_create_kern";
646         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
647                                SOCK_STREAM, IPPROTO_TCP, &sock);
648         if (err < 0) {
649                 sock = NULL;
650                 goto out;
651         }
652
653         sock->sk->sk_rcvtimeo =
654         sock->sk->sk_sndtimeo = connect_int * HZ;
655         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
656
657        /* explicitly bind to the configured IP as source IP
658         *  for the outgoing connections.
659         *  This is needed for multihomed hosts and to be
660         *  able to use lo: interfaces for drbd.
661         * Make sure to use 0 as port number, so linux selects
662         *  a free one dynamically.
663         */
664         what = "bind before connect";
665         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
666         if (err < 0)
667                 goto out;
668
669         /* connect may fail, peer not yet available.
670          * stay C_WF_CONNECTION, don't go Disconnecting! */
671         disconnect_on_error = 0;
672         what = "connect";
673         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
674
675 out:
676         if (err < 0) {
677                 if (sock) {
678                         sock_release(sock);
679                         sock = NULL;
680                 }
681                 switch (-err) {
682                         /* timeout, busy, signal pending */
683                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
684                 case EINTR: case ERESTARTSYS:
685                         /* peer not (yet) available, network problem */
686                 case ECONNREFUSED: case ENETUNREACH:
687                 case EHOSTDOWN:    case EHOSTUNREACH:
688                         disconnect_on_error = 0;
689                         break;
690                 default:
691                         conn_err(tconn, "%s failed, err = %d\n", what, err);
692                 }
693                 if (disconnect_on_error)
694                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
695         }
696
697         return sock;
698 }
699
700 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
701 {
702         int timeo, err, my_addr_len;
703         int sndbuf_size, rcvbuf_size, connect_int;
704         struct socket *s_estab = NULL, *s_listen;
705         struct sockaddr_in6 my_addr;
706         struct net_conf *nc;
707         const char *what;
708
709         rcu_read_lock();
710         nc = rcu_dereference(tconn->net_conf);
711         if (!nc) {
712                 rcu_read_unlock();
713                 return NULL;
714         }
715         sndbuf_size = nc->sndbuf_size;
716         rcvbuf_size = nc->rcvbuf_size;
717         connect_int = nc->connect_int;
718         rcu_read_unlock();
719
720         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
721         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
722
723         what = "sock_create_kern";
724         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
725                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
726         if (err) {
727                 s_listen = NULL;
728                 goto out;
729         }
730
731         timeo = connect_int * HZ;
732         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
733
734         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
735         s_listen->sk->sk_rcvtimeo = timeo;
736         s_listen->sk->sk_sndtimeo = timeo;
737         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
738
739         what = "bind before listen";
740         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
741         if (err < 0)
742                 goto out;
743
744         err = drbd_accept(&what, s_listen, &s_estab);
745
746 out:
747         if (s_listen)
748                 sock_release(s_listen);
749         if (err < 0) {
750                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
751                         conn_err(tconn, "%s failed, err = %d\n", what, err);
752                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
753                 }
754         }
755
756         return s_estab;
757 }
758
759 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
760
761 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
762                              enum drbd_packet cmd)
763 {
764         if (!conn_prepare_command(tconn, sock))
765                 return -EIO;
766         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
767 }
768
769 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
770 {
771         unsigned int header_size = drbd_header_size(tconn);
772         struct packet_info pi;
773         int err;
774
775         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
776         if (err != header_size) {
777                 if (err >= 0)
778                         err = -EIO;
779                 return err;
780         }
781         err = decode_header(tconn, tconn->data.rbuf, &pi);
782         if (err)
783                 return err;
784         return pi.cmd;
785 }
786
787 /**
788  * drbd_socket_okay() - Free the socket if its connection is not okay
789  * @sock:       pointer to the pointer to the socket.
790  */
791 static int drbd_socket_okay(struct socket **sock)
792 {
793         int rr;
794         char tb[4];
795
796         if (!*sock)
797                 return false;
798
799         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
800
801         if (rr > 0 || rr == -EAGAIN) {
802                 return true;
803         } else {
804                 sock_release(*sock);
805                 *sock = NULL;
806                 return false;
807         }
808 }
809 /* Gets called if a connection is established, or if a new minor gets created
810    in a connection */
811 int drbd_connected(struct drbd_conf *mdev)
812 {
813         int err;
814
815         atomic_set(&mdev->packet_seq, 0);
816         mdev->peer_seq = 0;
817
818         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
819                 &mdev->tconn->cstate_mutex :
820                 &mdev->own_state_mutex;
821
822         err = drbd_send_sync_param(mdev);
823         if (!err)
824                 err = drbd_send_sizes(mdev, 0, 0);
825         if (!err)
826                 err = drbd_send_uuids(mdev);
827         if (!err)
828                 err = drbd_send_current_state(mdev);
829         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
830         clear_bit(RESIZE_PENDING, &mdev->flags);
831         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
832         return err;
833 }
834
835 /*
836  * return values:
837  *   1 yes, we have a valid connection
838  *   0 oops, did not work out, please try again
839  *  -1 peer talks different language,
840  *     no point in trying again, please go standalone.
841  *  -2 We do not have a network config...
842  */
843 static int conn_connect(struct drbd_tconn *tconn)
844 {
845         struct socket *sock, *msock;
846         struct drbd_conf *mdev;
847         struct net_conf *nc;
848         int vnr, timeout, try, h, ok;
849         bool discard_my_data;
850
851         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
852                 return -2;
853
854         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
855
856         /* Assume that the peer only understands protocol 80 until we know better.  */
857         tconn->agreed_pro_version = 80;
858
859         do {
860                 struct socket *s;
861
862                 for (try = 0;;) {
863                         /* 3 tries, this should take less than a second! */
864                         s = drbd_try_connect(tconn);
865                         if (s || ++try >= 3)
866                                 break;
867                         /* give the other side time to call bind() & listen() */
868                         schedule_timeout_interruptible(HZ / 10);
869                 }
870
871                 if (s) {
872                         if (!tconn->data.socket) {
873                                 tconn->data.socket = s;
874                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
875                         } else if (!tconn->meta.socket) {
876                                 tconn->meta.socket = s;
877                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
878                         } else {
879                                 conn_err(tconn, "Logic error in conn_connect()\n");
880                                 goto out_release_sockets;
881                         }
882                 }
883
884                 if (tconn->data.socket && tconn->meta.socket) {
885                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
886                         ok = drbd_socket_okay(&tconn->data.socket);
887                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
888                         if (ok)
889                                 break;
890                 }
891
892 retry:
893                 s = drbd_wait_for_connect(tconn);
894                 if (s) {
895                         try = receive_first_packet(tconn, s);
896                         drbd_socket_okay(&tconn->data.socket);
897                         drbd_socket_okay(&tconn->meta.socket);
898                         switch (try) {
899                         case P_INITIAL_DATA:
900                                 if (tconn->data.socket) {
901                                         conn_warn(tconn, "initial packet S crossed\n");
902                                         sock_release(tconn->data.socket);
903                                 }
904                                 tconn->data.socket = s;
905                                 break;
906                         case P_INITIAL_META:
907                                 if (tconn->meta.socket) {
908                                         conn_warn(tconn, "initial packet M crossed\n");
909                                         sock_release(tconn->meta.socket);
910                                 }
911                                 tconn->meta.socket = s;
912                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
913                                 break;
914                         default:
915                                 conn_warn(tconn, "Error receiving initial packet\n");
916                                 sock_release(s);
917                                 if (random32() & 1)
918                                         goto retry;
919                         }
920                 }
921
922                 if (tconn->cstate <= C_DISCONNECTING)
923                         goto out_release_sockets;
924                 if (signal_pending(current)) {
925                         flush_signals(current);
926                         smp_rmb();
927                         if (get_t_state(&tconn->receiver) == EXITING)
928                                 goto out_release_sockets;
929                 }
930
931                 if (tconn->data.socket && &tconn->meta.socket) {
932                         ok = drbd_socket_okay(&tconn->data.socket);
933                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
934                         if (ok)
935                                 break;
936                 }
937         } while (1);
938
939         sock  = tconn->data.socket;
940         msock = tconn->meta.socket;
941
942         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
943         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
944
945         sock->sk->sk_allocation = GFP_NOIO;
946         msock->sk->sk_allocation = GFP_NOIO;
947
948         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
949         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
950
951         /* NOT YET ...
952          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
953          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
954          * first set it to the P_CONNECTION_FEATURES timeout,
955          * which we set to 4x the configured ping_timeout. */
956         rcu_read_lock();
957         nc = rcu_dereference(tconn->net_conf);
958
959         sock->sk->sk_sndtimeo =
960         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
961
962         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
963         timeout = nc->timeout * HZ / 10;
964         discard_my_data = nc->discard_my_data;
965         rcu_read_unlock();
966
967         msock->sk->sk_sndtimeo = timeout;
968
969         /* we don't want delays.
970          * we use TCP_CORK where appropriate, though */
971         drbd_tcp_nodelay(sock);
972         drbd_tcp_nodelay(msock);
973
974         tconn->last_received = jiffies;
975
976         h = drbd_do_features(tconn);
977         if (h <= 0)
978                 return h;
979
980         if (tconn->cram_hmac_tfm) {
981                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
982                 switch (drbd_do_auth(tconn)) {
983                 case -1:
984                         conn_err(tconn, "Authentication of peer failed\n");
985                         return -1;
986                 case 0:
987                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
988                         return 0;
989                 }
990         }
991
992         sock->sk->sk_sndtimeo = timeout;
993         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
994
995         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
996                 return -1;
997
998         rcu_read_lock();
999         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1000                 kref_get(&mdev->kref);
1001                 rcu_read_unlock();
1002
1003                 if (discard_my_data)
1004                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1005                 else
1006                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1007
1008                 drbd_connected(mdev);
1009                 kref_put(&mdev->kref, &drbd_minor_destroy);
1010                 rcu_read_lock();
1011         }
1012         rcu_read_unlock();
1013
1014         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
1015                 return 0;
1016
1017         drbd_thread_start(&tconn->asender);
1018
1019         mutex_lock(&tconn->conf_update);
1020         /* The discard_my_data flag is a single-shot modifier to the next
1021          * connection attempt, the handshake of which is now well underway.
1022          * No need for rcu style copying of the whole struct
1023          * just to clear a single value. */
1024         tconn->net_conf->discard_my_data = 0;
1025         mutex_unlock(&tconn->conf_update);
1026
1027         return h;
1028
1029 out_release_sockets:
1030         if (tconn->data.socket) {
1031                 sock_release(tconn->data.socket);
1032                 tconn->data.socket = NULL;
1033         }
1034         if (tconn->meta.socket) {
1035                 sock_release(tconn->meta.socket);
1036                 tconn->meta.socket = NULL;
1037         }
1038         return -1;
1039 }
1040
1041 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1042 {
1043         unsigned int header_size = drbd_header_size(tconn);
1044
1045         if (header_size == sizeof(struct p_header100) &&
1046             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1047                 struct p_header100 *h = header;
1048                 if (h->pad != 0) {
1049                         conn_err(tconn, "Header padding is not zero\n");
1050                         return -EINVAL;
1051                 }
1052                 pi->vnr = be16_to_cpu(h->volume);
1053                 pi->cmd = be16_to_cpu(h->command);
1054                 pi->size = be32_to_cpu(h->length);
1055         } else if (header_size == sizeof(struct p_header95) &&
1056                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1057                 struct p_header95 *h = header;
1058                 pi->cmd = be16_to_cpu(h->command);
1059                 pi->size = be32_to_cpu(h->length);
1060                 pi->vnr = 0;
1061         } else if (header_size == sizeof(struct p_header80) &&
1062                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1063                 struct p_header80 *h = header;
1064                 pi->cmd = be16_to_cpu(h->command);
1065                 pi->size = be16_to_cpu(h->length);
1066                 pi->vnr = 0;
1067         } else {
1068                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1069                          be32_to_cpu(*(__be32 *)header),
1070                          tconn->agreed_pro_version);
1071                 return -EINVAL;
1072         }
1073         pi->data = header + header_size;
1074         return 0;
1075 }
1076
1077 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1078 {
1079         void *buffer = tconn->data.rbuf;
1080         int err;
1081
1082         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1083         if (err)
1084                 return err;
1085
1086         err = decode_header(tconn, buffer, pi);
1087         tconn->last_received = jiffies;
1088
1089         return err;
1090 }
1091
1092 static void drbd_flush(struct drbd_tconn *tconn)
1093 {
1094         int rv;
1095         struct drbd_conf *mdev;
1096         int vnr;
1097
1098         if (tconn->write_ordering >= WO_bdev_flush) {
1099                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1100                         if (get_ldev(mdev)) {
1101                                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1102                                                         NULL);
1103                                 put_ldev(mdev);
1104
1105                                 if (rv) {
1106                                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
1107                                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1108                                          * don't try again for ANY return value != 0
1109                                          * if (rv == -EOPNOTSUPP) */
1110                                         drbd_bump_write_ordering(tconn, WO_drain_io);
1111                                         break;
1112                                 }
1113                         }
1114                 }
1115         }
1116 }
1117
1118 /**
1119  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1120  * @mdev:       DRBD device.
1121  * @epoch:      Epoch object.
1122  * @ev:         Epoch event.
1123  */
1124 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1125                                                struct drbd_epoch *epoch,
1126                                                enum epoch_event ev)
1127 {
1128         int epoch_size;
1129         struct drbd_epoch *next_epoch;
1130         enum finish_epoch rv = FE_STILL_LIVE;
1131
1132         spin_lock(&tconn->epoch_lock);
1133         do {
1134                 next_epoch = NULL;
1135
1136                 epoch_size = atomic_read(&epoch->epoch_size);
1137
1138                 switch (ev & ~EV_CLEANUP) {
1139                 case EV_PUT:
1140                         atomic_dec(&epoch->active);
1141                         break;
1142                 case EV_GOT_BARRIER_NR:
1143                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1144                         break;
1145                 case EV_BECAME_LAST:
1146                         /* nothing to do*/
1147                         break;
1148                 }
1149
1150                 if (epoch_size != 0 &&
1151                     atomic_read(&epoch->active) == 0 &&
1152                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1153                         if (!(ev & EV_CLEANUP)) {
1154                                 spin_unlock(&tconn->epoch_lock);
1155                                 drbd_send_b_ack(epoch->mdev, epoch->barrier_nr, epoch_size);
1156                                 spin_lock(&tconn->epoch_lock);
1157                         }
1158                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1159                                 dec_unacked(epoch->mdev);
1160
1161                         if (tconn->current_epoch != epoch) {
1162                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1163                                 list_del(&epoch->list);
1164                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1165                                 tconn->epochs--;
1166                                 kfree(epoch);
1167
1168                                 if (rv == FE_STILL_LIVE)
1169                                         rv = FE_DESTROYED;
1170                         } else {
1171                                 epoch->flags = 0;
1172                                 atomic_set(&epoch->epoch_size, 0);
1173                                 /* atomic_set(&epoch->active, 0); is already zero */
1174                                 if (rv == FE_STILL_LIVE)
1175                                         rv = FE_RECYCLED;
1176                         }
1177                 }
1178
1179                 if (!next_epoch)
1180                         break;
1181
1182                 epoch = next_epoch;
1183         } while (1);
1184
1185         spin_unlock(&tconn->epoch_lock);
1186
1187         return rv;
1188 }
1189
1190 /**
1191  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1192  * @tconn:      DRBD connection.
1193  * @wo:         Write ordering method to try.
1194  */
1195 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1196 {
1197         struct disk_conf *dc;
1198         struct drbd_conf *mdev;
1199         enum write_ordering_e pwo;
1200         int vnr;
1201         static char *write_ordering_str[] = {
1202                 [WO_none] = "none",
1203                 [WO_drain_io] = "drain",
1204                 [WO_bdev_flush] = "flush",
1205         };
1206
1207         pwo = tconn->write_ordering;
1208         wo = min(pwo, wo);
1209         rcu_read_lock();
1210         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1211                 if (!get_ldev(mdev))
1212                         continue;
1213                 dc = rcu_dereference(mdev->ldev->disk_conf);
1214
1215                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1216                         wo = WO_drain_io;
1217                 if (wo == WO_drain_io && !dc->disk_drain)
1218                         wo = WO_none;
1219                 put_ldev(mdev);
1220         }
1221         rcu_read_unlock();
1222         tconn->write_ordering = wo;
1223         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1224                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1225 }
1226
1227 /**
1228  * drbd_submit_peer_request()
1229  * @mdev:       DRBD device.
1230  * @peer_req:   peer request
1231  * @rw:         flag field, see bio->bi_rw
1232  *
1233  * May spread the pages to multiple bios,
1234  * depending on bio_add_page restrictions.
1235  *
1236  * Returns 0 if all bios have been submitted,
1237  * -ENOMEM if we could not allocate enough bios,
1238  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1239  *  single page to an empty bio (which should never happen and likely indicates
1240  *  that the lower level IO stack is in some way broken). This has been observed
1241  *  on certain Xen deployments.
1242  */
1243 /* TODO allocate from our own bio_set. */
1244 int drbd_submit_peer_request(struct drbd_conf *mdev,
1245                              struct drbd_peer_request *peer_req,
1246                              const unsigned rw, const int fault_type)
1247 {
1248         struct bio *bios = NULL;
1249         struct bio *bio;
1250         struct page *page = peer_req->pages;
1251         sector_t sector = peer_req->i.sector;
1252         unsigned ds = peer_req->i.size;
1253         unsigned n_bios = 0;
1254         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1255         int err = -ENOMEM;
1256
1257         /* In most cases, we will only need one bio.  But in case the lower
1258          * level restrictions happen to be different at this offset on this
1259          * side than those of the sending peer, we may need to submit the
1260          * request in more than one bio.
1261          *
1262          * Plain bio_alloc is good enough here, this is no DRBD internally
1263          * generated bio, but a bio allocated on behalf of the peer.
1264          */
1265 next_bio:
1266         bio = bio_alloc(GFP_NOIO, nr_pages);
1267         if (!bio) {
1268                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1269                 goto fail;
1270         }
1271         /* > peer_req->i.sector, unless this is the first bio */
1272         bio->bi_sector = sector;
1273         bio->bi_bdev = mdev->ldev->backing_bdev;
1274         bio->bi_rw = rw;
1275         bio->bi_private = peer_req;
1276         bio->bi_end_io = drbd_peer_request_endio;
1277
1278         bio->bi_next = bios;
1279         bios = bio;
1280         ++n_bios;
1281
1282         page_chain_for_each(page) {
1283                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1284                 if (!bio_add_page(bio, page, len, 0)) {
1285                         /* A single page must always be possible!
1286                          * But in case it fails anyways,
1287                          * we deal with it, and complain (below). */
1288                         if (bio->bi_vcnt == 0) {
1289                                 dev_err(DEV,
1290                                         "bio_add_page failed for len=%u, "
1291                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1292                                         len, (unsigned long long)bio->bi_sector);
1293                                 err = -ENOSPC;
1294                                 goto fail;
1295                         }
1296                         goto next_bio;
1297                 }
1298                 ds -= len;
1299                 sector += len >> 9;
1300                 --nr_pages;
1301         }
1302         D_ASSERT(page == NULL);
1303         D_ASSERT(ds == 0);
1304
1305         atomic_set(&peer_req->pending_bios, n_bios);
1306         do {
1307                 bio = bios;
1308                 bios = bios->bi_next;
1309                 bio->bi_next = NULL;
1310
1311                 drbd_generic_make_request(mdev, fault_type, bio);
1312         } while (bios);
1313         return 0;
1314
1315 fail:
1316         while (bios) {
1317                 bio = bios;
1318                 bios = bios->bi_next;
1319                 bio_put(bio);
1320         }
1321         return err;
1322 }
1323
1324 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1325                                              struct drbd_peer_request *peer_req)
1326 {
1327         struct drbd_interval *i = &peer_req->i;
1328
1329         drbd_remove_interval(&mdev->write_requests, i);
1330         drbd_clear_interval(i);
1331
1332         /* Wake up any processes waiting for this peer request to complete.  */
1333         if (i->waiting)
1334                 wake_up(&mdev->misc_wait);
1335 }
1336
1337 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1338 {
1339         struct drbd_conf *mdev;
1340         int vnr;
1341
1342         rcu_read_lock();
1343         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1344                 kref_get(&mdev->kref);
1345                 rcu_read_unlock();
1346                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1347                 kref_put(&mdev->kref, &drbd_minor_destroy);
1348                 rcu_read_lock();
1349         }
1350         rcu_read_unlock();
1351 }
1352
1353 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1354 {
1355         struct drbd_conf *mdev;
1356         int rv;
1357         struct p_barrier *p = pi->data;
1358         struct drbd_epoch *epoch;
1359
1360         mdev = vnr_to_mdev(tconn, pi->vnr);
1361         if (!mdev)
1362                 return -EIO;
1363
1364         inc_unacked(mdev);
1365
1366         tconn->current_epoch->barrier_nr = p->barrier;
1367         tconn->current_epoch->mdev = mdev;
1368         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1369
1370         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1371          * the activity log, which means it would not be resynced in case the
1372          * R_PRIMARY crashes now.
1373          * Therefore we must send the barrier_ack after the barrier request was
1374          * completed. */
1375         switch (tconn->write_ordering) {
1376         case WO_none:
1377                 if (rv == FE_RECYCLED)
1378                         return 0;
1379
1380                 /* receiver context, in the writeout path of the other node.
1381                  * avoid potential distributed deadlock */
1382                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1383                 if (epoch)
1384                         break;
1385                 else
1386                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1387                         /* Fall through */
1388
1389         case WO_bdev_flush:
1390         case WO_drain_io:
1391                 conn_wait_active_ee_empty(tconn);
1392                 drbd_flush(tconn);
1393
1394                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1395                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1396                         if (epoch)
1397                                 break;
1398                 }
1399
1400                 epoch = tconn->current_epoch;
1401                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1402
1403                 D_ASSERT(atomic_read(&epoch->active) == 0);
1404                 D_ASSERT(epoch->flags == 0);
1405
1406                 return 0;
1407         default:
1408                 dev_err(DEV, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1409                 return -EIO;
1410         }
1411
1412         epoch->flags = 0;
1413         atomic_set(&epoch->epoch_size, 0);
1414         atomic_set(&epoch->active, 0);
1415
1416         spin_lock(&tconn->epoch_lock);
1417         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1418                 list_add(&epoch->list, &tconn->current_epoch->list);
1419                 tconn->current_epoch = epoch;
1420                 tconn->epochs++;
1421         } else {
1422                 /* The current_epoch got recycled while we allocated this one... */
1423                 kfree(epoch);
1424         }
1425         spin_unlock(&tconn->epoch_lock);
1426
1427         return 0;
1428 }
1429
1430 /* used from receive_RSDataReply (recv_resync_read)
1431  * and from receive_Data */
1432 static struct drbd_peer_request *
1433 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1434               int data_size) __must_hold(local)
1435 {
1436         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1437         struct drbd_peer_request *peer_req;
1438         struct page *page;
1439         int dgs, ds, err;
1440         void *dig_in = mdev->tconn->int_dig_in;
1441         void *dig_vv = mdev->tconn->int_dig_vv;
1442         unsigned long *data;
1443
1444         dgs = 0;
1445         if (mdev->tconn->peer_integrity_tfm) {
1446                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1447                 /*
1448                  * FIXME: Receive the incoming digest into the receive buffer
1449                  *        here, together with its struct p_data?
1450                  */
1451                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1452                 if (err)
1453                         return NULL;
1454                 data_size -= dgs;
1455         }
1456
1457         if (!expect(data_size != 0))
1458                 return NULL;
1459         if (!expect(IS_ALIGNED(data_size, 512)))
1460                 return NULL;
1461         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1462                 return NULL;
1463
1464         /* even though we trust out peer,
1465          * we sometimes have to double check. */
1466         if (sector + (data_size>>9) > capacity) {
1467                 dev_err(DEV, "request from peer beyond end of local disk: "
1468                         "capacity: %llus < sector: %llus + size: %u\n",
1469                         (unsigned long long)capacity,
1470                         (unsigned long long)sector, data_size);
1471                 return NULL;
1472         }
1473
1474         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1475          * "criss-cross" setup, that might cause write-out on some other DRBD,
1476          * which in turn might block on the other node at this very place.  */
1477         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1478         if (!peer_req)
1479                 return NULL;
1480
1481         ds = data_size;
1482         page = peer_req->pages;
1483         page_chain_for_each(page) {
1484                 unsigned len = min_t(int, ds, PAGE_SIZE);
1485                 data = kmap(page);
1486                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1487                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1488                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1489                         data[0] = data[0] ^ (unsigned long)-1;
1490                 }
1491                 kunmap(page);
1492                 if (err) {
1493                         drbd_free_peer_req(mdev, peer_req);
1494                         return NULL;
1495                 }
1496                 ds -= len;
1497         }
1498
1499         if (dgs) {
1500                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1501                 if (memcmp(dig_in, dig_vv, dgs)) {
1502                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1503                                 (unsigned long long)sector, data_size);
1504                         drbd_free_peer_req(mdev, peer_req);
1505                         return NULL;
1506                 }
1507         }
1508         mdev->recv_cnt += data_size>>9;
1509         return peer_req;
1510 }
1511
1512 /* drbd_drain_block() just takes a data block
1513  * out of the socket input buffer, and discards it.
1514  */
1515 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1516 {
1517         struct page *page;
1518         int err = 0;
1519         void *data;
1520
1521         if (!data_size)
1522                 return 0;
1523
1524         page = drbd_alloc_pages(mdev, 1, 1);
1525
1526         data = kmap(page);
1527         while (data_size) {
1528                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1529
1530                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1531                 if (err)
1532                         break;
1533                 data_size -= len;
1534         }
1535         kunmap(page);
1536         drbd_free_pages(mdev, page, 0);
1537         return err;
1538 }
1539
1540 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1541                            sector_t sector, int data_size)
1542 {
1543         struct bio_vec *bvec;
1544         struct bio *bio;
1545         int dgs, err, i, expect;
1546         void *dig_in = mdev->tconn->int_dig_in;
1547         void *dig_vv = mdev->tconn->int_dig_vv;
1548
1549         dgs = 0;
1550         if (mdev->tconn->peer_integrity_tfm) {
1551                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1552                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1553                 if (err)
1554                         return err;
1555                 data_size -= dgs;
1556         }
1557
1558         /* optimistically update recv_cnt.  if receiving fails below,
1559          * we disconnect anyways, and counters will be reset. */
1560         mdev->recv_cnt += data_size>>9;
1561
1562         bio = req->master_bio;
1563         D_ASSERT(sector == bio->bi_sector);
1564
1565         bio_for_each_segment(bvec, bio, i) {
1566                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1567                 expect = min_t(int, data_size, bvec->bv_len);
1568                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1569                 kunmap(bvec->bv_page);
1570                 if (err)
1571                         return err;
1572                 data_size -= expect;
1573         }
1574
1575         if (dgs) {
1576                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1577                 if (memcmp(dig_in, dig_vv, dgs)) {
1578                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1579                         return -EINVAL;
1580                 }
1581         }
1582
1583         D_ASSERT(data_size == 0);
1584         return 0;
1585 }
1586
1587 /*
1588  * e_end_resync_block() is called in asender context via
1589  * drbd_finish_peer_reqs().
1590  */
1591 static int e_end_resync_block(struct drbd_work *w, int unused)
1592 {
1593         struct drbd_peer_request *peer_req =
1594                 container_of(w, struct drbd_peer_request, w);
1595         struct drbd_conf *mdev = w->mdev;
1596         sector_t sector = peer_req->i.sector;
1597         int err;
1598
1599         D_ASSERT(drbd_interval_empty(&peer_req->i));
1600
1601         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1602                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1603                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1604         } else {
1605                 /* Record failure to sync */
1606                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1607
1608                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1609         }
1610         dec_unacked(mdev);
1611
1612         return err;
1613 }
1614
1615 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1616 {
1617         struct drbd_peer_request *peer_req;
1618
1619         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1620         if (!peer_req)
1621                 goto fail;
1622
1623         dec_rs_pending(mdev);
1624
1625         inc_unacked(mdev);
1626         /* corresponding dec_unacked() in e_end_resync_block()
1627          * respective _drbd_clear_done_ee */
1628
1629         peer_req->w.cb = e_end_resync_block;
1630
1631         spin_lock_irq(&mdev->tconn->req_lock);
1632         list_add(&peer_req->w.list, &mdev->sync_ee);
1633         spin_unlock_irq(&mdev->tconn->req_lock);
1634
1635         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1636         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1637                 return 0;
1638
1639         /* don't care for the reason here */
1640         dev_err(DEV, "submit failed, triggering re-connect\n");
1641         spin_lock_irq(&mdev->tconn->req_lock);
1642         list_del(&peer_req->w.list);
1643         spin_unlock_irq(&mdev->tconn->req_lock);
1644
1645         drbd_free_peer_req(mdev, peer_req);
1646 fail:
1647         put_ldev(mdev);
1648         return -EIO;
1649 }
1650
1651 static struct drbd_request *
1652 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1653              sector_t sector, bool missing_ok, const char *func)
1654 {
1655         struct drbd_request *req;
1656
1657         /* Request object according to our peer */
1658         req = (struct drbd_request *)(unsigned long)id;
1659         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1660                 return req;
1661         if (!missing_ok) {
1662                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1663                         (unsigned long)id, (unsigned long long)sector);
1664         }
1665         return NULL;
1666 }
1667
1668 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1669 {
1670         struct drbd_conf *mdev;
1671         struct drbd_request *req;
1672         sector_t sector;
1673         int err;
1674         struct p_data *p = pi->data;
1675
1676         mdev = vnr_to_mdev(tconn, pi->vnr);
1677         if (!mdev)
1678                 return -EIO;
1679
1680         sector = be64_to_cpu(p->sector);
1681
1682         spin_lock_irq(&mdev->tconn->req_lock);
1683         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1684         spin_unlock_irq(&mdev->tconn->req_lock);
1685         if (unlikely(!req))
1686                 return -EIO;
1687
1688         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1689          * special casing it there for the various failure cases.
1690          * still no race with drbd_fail_pending_reads */
1691         err = recv_dless_read(mdev, req, sector, pi->size);
1692         if (!err)
1693                 req_mod(req, DATA_RECEIVED);
1694         /* else: nothing. handled from drbd_disconnect...
1695          * I don't think we may complete this just yet
1696          * in case we are "on-disconnect: freeze" */
1697
1698         return err;
1699 }
1700
1701 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1702 {
1703         struct drbd_conf *mdev;
1704         sector_t sector;
1705         int err;
1706         struct p_data *p = pi->data;
1707
1708         mdev = vnr_to_mdev(tconn, pi->vnr);
1709         if (!mdev)
1710                 return -EIO;
1711
1712         sector = be64_to_cpu(p->sector);
1713         D_ASSERT(p->block_id == ID_SYNCER);
1714
1715         if (get_ldev(mdev)) {
1716                 /* data is submitted to disk within recv_resync_read.
1717                  * corresponding put_ldev done below on error,
1718                  * or in drbd_peer_request_endio. */
1719                 err = recv_resync_read(mdev, sector, pi->size);
1720         } else {
1721                 if (__ratelimit(&drbd_ratelimit_state))
1722                         dev_err(DEV, "Can not write resync data to local disk.\n");
1723
1724                 err = drbd_drain_block(mdev, pi->size);
1725
1726                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1727         }
1728
1729         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1730
1731         return err;
1732 }
1733
1734 static int w_restart_write(struct drbd_work *w, int cancel)
1735 {
1736         struct drbd_request *req = container_of(w, struct drbd_request, w);
1737         struct drbd_conf *mdev = w->mdev;
1738         struct bio *bio;
1739         unsigned long start_time;
1740         unsigned long flags;
1741
1742         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1743         if (!expect(req->rq_state & RQ_POSTPONED)) {
1744                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1745                 return -EIO;
1746         }
1747         bio = req->master_bio;
1748         start_time = req->start_time;
1749         /* Postponed requests will not have their master_bio completed!  */
1750         __req_mod(req, DISCARD_WRITE, NULL);
1751         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1752
1753         while (__drbd_make_request(mdev, bio, start_time))
1754                 /* retry */ ;
1755         return 0;
1756 }
1757
1758 static void restart_conflicting_writes(struct drbd_conf *mdev,
1759                                        sector_t sector, int size)
1760 {
1761         struct drbd_interval *i;
1762         struct drbd_request *req;
1763
1764         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1765                 if (!i->local)
1766                         continue;
1767                 req = container_of(i, struct drbd_request, i);
1768                 if (req->rq_state & RQ_LOCAL_PENDING ||
1769                     !(req->rq_state & RQ_POSTPONED))
1770                         continue;
1771                 if (expect(list_empty(&req->w.list))) {
1772                         req->w.mdev = mdev;
1773                         req->w.cb = w_restart_write;
1774                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1775                 }
1776         }
1777 }
1778
1779 /*
1780  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1781  */
1782 static int e_end_block(struct drbd_work *w, int cancel)
1783 {
1784         struct drbd_peer_request *peer_req =
1785                 container_of(w, struct drbd_peer_request, w);
1786         struct drbd_conf *mdev = w->mdev;
1787         sector_t sector = peer_req->i.sector;
1788         int err = 0, pcmd;
1789
1790         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1791                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1792                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1793                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1794                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1795                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1796                         err = drbd_send_ack(mdev, pcmd, peer_req);
1797                         if (pcmd == P_RS_WRITE_ACK)
1798                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1799                 } else {
1800                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1801                         /* we expect it to be marked out of sync anyways...
1802                          * maybe assert this?  */
1803                 }
1804                 dec_unacked(mdev);
1805         }
1806         /* we delete from the conflict detection hash _after_ we sent out the
1807          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1808         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1809                 spin_lock_irq(&mdev->tconn->req_lock);
1810                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1811                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1812                 if (peer_req->flags & EE_RESTART_REQUESTS)
1813                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1814                 spin_unlock_irq(&mdev->tconn->req_lock);
1815         } else
1816                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1817
1818         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1819
1820         return err;
1821 }
1822
1823 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1824 {
1825         struct drbd_conf *mdev = w->mdev;
1826         struct drbd_peer_request *peer_req =
1827                 container_of(w, struct drbd_peer_request, w);
1828         int err;
1829
1830         err = drbd_send_ack(mdev, ack, peer_req);
1831         dec_unacked(mdev);
1832
1833         return err;
1834 }
1835
1836 static int e_send_discard_write(struct drbd_work *w, int unused)
1837 {
1838         return e_send_ack(w, P_DISCARD_WRITE);
1839 }
1840
1841 static int e_send_retry_write(struct drbd_work *w, int unused)
1842 {
1843         struct drbd_tconn *tconn = w->mdev->tconn;
1844
1845         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1846                              P_RETRY_WRITE : P_DISCARD_WRITE);
1847 }
1848
1849 static bool seq_greater(u32 a, u32 b)
1850 {
1851         /*
1852          * We assume 32-bit wrap-around here.
1853          * For 24-bit wrap-around, we would have to shift:
1854          *  a <<= 8; b <<= 8;
1855          */
1856         return (s32)a - (s32)b > 0;
1857 }
1858
1859 static u32 seq_max(u32 a, u32 b)
1860 {
1861         return seq_greater(a, b) ? a : b;
1862 }
1863
1864 static bool need_peer_seq(struct drbd_conf *mdev)
1865 {
1866         struct drbd_tconn *tconn = mdev->tconn;
1867         int tp;
1868
1869         /*
1870          * We only need to keep track of the last packet_seq number of our peer
1871          * if we are in dual-primary mode and we have the discard flag set; see
1872          * handle_write_conflicts().
1873          */
1874
1875         rcu_read_lock();
1876         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1877         rcu_read_unlock();
1878
1879         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1880 }
1881
1882 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1883 {
1884         unsigned int newest_peer_seq;
1885
1886         if (need_peer_seq(mdev)) {
1887                 spin_lock(&mdev->peer_seq_lock);
1888                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1889                 mdev->peer_seq = newest_peer_seq;
1890                 spin_unlock(&mdev->peer_seq_lock);
1891                 /* wake up only if we actually changed mdev->peer_seq */
1892                 if (peer_seq == newest_peer_seq)
1893                         wake_up(&mdev->seq_wait);
1894         }
1895 }
1896
1897 /* Called from receive_Data.
1898  * Synchronize packets on sock with packets on msock.
1899  *
1900  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1901  * packet traveling on msock, they are still processed in the order they have
1902  * been sent.
1903  *
1904  * Note: we don't care for Ack packets overtaking P_DATA packets.
1905  *
1906  * In case packet_seq is larger than mdev->peer_seq number, there are
1907  * outstanding packets on the msock. We wait for them to arrive.
1908  * In case we are the logically next packet, we update mdev->peer_seq
1909  * ourselves. Correctly handles 32bit wrap around.
1910  *
1911  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1912  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1913  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1914  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1915  *
1916  * returns 0 if we may process the packet,
1917  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1918 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1919 {
1920         DEFINE_WAIT(wait);
1921         long timeout;
1922         int ret;
1923
1924         if (!need_peer_seq(mdev))
1925                 return 0;
1926
1927         spin_lock(&mdev->peer_seq_lock);
1928         for (;;) {
1929                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1930                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1931                         ret = 0;
1932                         break;
1933                 }
1934                 if (signal_pending(current)) {
1935                         ret = -ERESTARTSYS;
1936                         break;
1937                 }
1938                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1939                 spin_unlock(&mdev->peer_seq_lock);
1940                 rcu_read_lock();
1941                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1942                 rcu_read_unlock();
1943                 timeout = schedule_timeout(timeout);
1944                 spin_lock(&mdev->peer_seq_lock);
1945                 if (!timeout) {
1946                         ret = -ETIMEDOUT;
1947                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1948                         break;
1949                 }
1950         }
1951         spin_unlock(&mdev->peer_seq_lock);
1952         finish_wait(&mdev->seq_wait, &wait);
1953         return ret;
1954 }
1955
1956 /* see also bio_flags_to_wire()
1957  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1958  * flags and back. We may replicate to other kernel versions. */
1959 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1960 {
1961         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1962                 (dpf & DP_FUA ? REQ_FUA : 0) |
1963                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1964                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1965 }
1966
1967 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1968                                     unsigned int size)
1969 {
1970         struct drbd_interval *i;
1971
1972     repeat:
1973         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1974                 struct drbd_request *req;
1975                 struct bio_and_error m;
1976
1977                 if (!i->local)
1978                         continue;
1979                 req = container_of(i, struct drbd_request, i);
1980                 if (!(req->rq_state & RQ_POSTPONED))
1981                         continue;
1982                 req->rq_state &= ~RQ_POSTPONED;
1983                 __req_mod(req, NEG_ACKED, &m);
1984                 spin_unlock_irq(&mdev->tconn->req_lock);
1985                 if (m.bio)
1986                         complete_master_bio(mdev, &m);
1987                 spin_lock_irq(&mdev->tconn->req_lock);
1988                 goto repeat;
1989         }
1990 }
1991
1992 static int handle_write_conflicts(struct drbd_conf *mdev,
1993                                   struct drbd_peer_request *peer_req)
1994 {
1995         struct drbd_tconn *tconn = mdev->tconn;
1996         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1997         sector_t sector = peer_req->i.sector;
1998         const unsigned int size = peer_req->i.size;
1999         struct drbd_interval *i;
2000         bool equal;
2001         int err;
2002
2003         /*
2004          * Inserting the peer request into the write_requests tree will prevent
2005          * new conflicting local requests from being added.
2006          */
2007         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2008
2009     repeat:
2010         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2011                 if (i == &peer_req->i)
2012                         continue;
2013
2014                 if (!i->local) {
2015                         /*
2016                          * Our peer has sent a conflicting remote request; this
2017                          * should not happen in a two-node setup.  Wait for the
2018                          * earlier peer request to complete.
2019                          */
2020                         err = drbd_wait_misc(mdev, i);
2021                         if (err)
2022                                 goto out;
2023                         goto repeat;
2024                 }
2025
2026                 equal = i->sector == sector && i->size == size;
2027                 if (resolve_conflicts) {
2028                         /*
2029                          * If the peer request is fully contained within the
2030                          * overlapping request, it can be discarded; otherwise,
2031                          * it will be retried once all overlapping requests
2032                          * have completed.
2033                          */
2034                         bool discard = i->sector <= sector && i->sector +
2035                                        (i->size >> 9) >= sector + (size >> 9);
2036
2037                         if (!equal)
2038                                 dev_alert(DEV, "Concurrent writes detected: "
2039                                                "local=%llus +%u, remote=%llus +%u, "
2040                                                "assuming %s came first\n",
2041                                           (unsigned long long)i->sector, i->size,
2042                                           (unsigned long long)sector, size,
2043                                           discard ? "local" : "remote");
2044
2045                         inc_unacked(mdev);
2046                         peer_req->w.cb = discard ? e_send_discard_write :
2047                                                    e_send_retry_write;
2048                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2049                         wake_asender(mdev->tconn);
2050
2051                         err = -ENOENT;
2052                         goto out;
2053                 } else {
2054                         struct drbd_request *req =
2055                                 container_of(i, struct drbd_request, i);
2056
2057                         if (!equal)
2058                                 dev_alert(DEV, "Concurrent writes detected: "
2059                                                "local=%llus +%u, remote=%llus +%u\n",
2060                                           (unsigned long long)i->sector, i->size,
2061                                           (unsigned long long)sector, size);
2062
2063                         if (req->rq_state & RQ_LOCAL_PENDING ||
2064                             !(req->rq_state & RQ_POSTPONED)) {
2065                                 /*
2066                                  * Wait for the node with the discard flag to
2067                                  * decide if this request will be discarded or
2068                                  * retried.  Requests that are discarded will
2069                                  * disappear from the write_requests tree.
2070                                  *
2071                                  * In addition, wait for the conflicting
2072                                  * request to finish locally before submitting
2073                                  * the conflicting peer request.
2074                                  */
2075                                 err = drbd_wait_misc(mdev, &req->i);
2076                                 if (err) {
2077                                         _conn_request_state(mdev->tconn,
2078                                                             NS(conn, C_TIMEOUT),
2079                                                             CS_HARD);
2080                                         fail_postponed_requests(mdev, sector, size);
2081                                         goto out;
2082                                 }
2083                                 goto repeat;
2084                         }
2085                         /*
2086                          * Remember to restart the conflicting requests after
2087                          * the new peer request has completed.
2088                          */
2089                         peer_req->flags |= EE_RESTART_REQUESTS;
2090                 }
2091         }
2092         err = 0;
2093
2094     out:
2095         if (err)
2096                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2097         return err;
2098 }
2099
2100 /* mirrored write */
2101 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2102 {
2103         struct drbd_conf *mdev;
2104         sector_t sector;
2105         struct drbd_peer_request *peer_req;
2106         struct p_data *p = pi->data;
2107         u32 peer_seq = be32_to_cpu(p->seq_num);
2108         int rw = WRITE;
2109         u32 dp_flags;
2110         int err, tp;
2111
2112         mdev = vnr_to_mdev(tconn, pi->vnr);
2113         if (!mdev)
2114                 return -EIO;
2115
2116         if (!get_ldev(mdev)) {
2117                 int err2;
2118
2119                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2120                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2121                 atomic_inc(&tconn->current_epoch->epoch_size);
2122                 err2 = drbd_drain_block(mdev, pi->size);
2123                 if (!err)
2124                         err = err2;
2125                 return err;
2126         }
2127
2128         /*
2129          * Corresponding put_ldev done either below (on various errors), or in
2130          * drbd_peer_request_endio, if we successfully submit the data at the
2131          * end of this function.
2132          */
2133
2134         sector = be64_to_cpu(p->sector);
2135         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2136         if (!peer_req) {
2137                 put_ldev(mdev);
2138                 return -EIO;
2139         }
2140
2141         peer_req->w.cb = e_end_block;
2142
2143         dp_flags = be32_to_cpu(p->dp_flags);
2144         rw |= wire_flags_to_bio(mdev, dp_flags);
2145
2146         if (dp_flags & DP_MAY_SET_IN_SYNC)
2147                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2148
2149         spin_lock(&tconn->epoch_lock);
2150         peer_req->epoch = tconn->current_epoch;
2151         atomic_inc(&peer_req->epoch->epoch_size);
2152         atomic_inc(&peer_req->epoch->active);
2153         spin_unlock(&tconn->epoch_lock);
2154
2155         rcu_read_lock();
2156         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2157         rcu_read_unlock();
2158         if (tp) {
2159                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2160                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2161                 if (err)
2162                         goto out_interrupted;
2163                 spin_lock_irq(&mdev->tconn->req_lock);
2164                 err = handle_write_conflicts(mdev, peer_req);
2165                 if (err) {
2166                         spin_unlock_irq(&mdev->tconn->req_lock);
2167                         if (err == -ENOENT) {
2168                                 put_ldev(mdev);
2169                                 return 0;
2170                         }
2171                         goto out_interrupted;
2172                 }
2173         } else
2174                 spin_lock_irq(&mdev->tconn->req_lock);
2175         list_add(&peer_req->w.list, &mdev->active_ee);
2176         spin_unlock_irq(&mdev->tconn->req_lock);
2177
2178         if (mdev->tconn->agreed_pro_version < 100) {
2179                 rcu_read_lock();
2180                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2181                 case DRBD_PROT_C:
2182                         dp_flags |= DP_SEND_WRITE_ACK;
2183                         break;
2184                 case DRBD_PROT_B:
2185                         dp_flags |= DP_SEND_RECEIVE_ACK;
2186                         break;
2187                 }
2188                 rcu_read_unlock();
2189         }
2190
2191         if (dp_flags & DP_SEND_WRITE_ACK) {
2192                 peer_req->flags |= EE_SEND_WRITE_ACK;
2193                 inc_unacked(mdev);
2194                 /* corresponding dec_unacked() in e_end_block()
2195                  * respective _drbd_clear_done_ee */
2196         }
2197
2198         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2199                 /* I really don't like it that the receiver thread
2200                  * sends on the msock, but anyways */
2201                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2202         }
2203
2204         if (mdev->state.pdsk < D_INCONSISTENT) {
2205                 /* In case we have the only disk of the cluster, */
2206                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2207                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2208                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2209                 drbd_al_begin_io(mdev, &peer_req->i);
2210         }
2211
2212         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2213         if (!err)
2214                 return 0;
2215
2216         /* don't care for the reason here */
2217         dev_err(DEV, "submit failed, triggering re-connect\n");
2218         spin_lock_irq(&mdev->tconn->req_lock);
2219         list_del(&peer_req->w.list);
2220         drbd_remove_epoch_entry_interval(mdev, peer_req);
2221         spin_unlock_irq(&mdev->tconn->req_lock);
2222         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2223                 drbd_al_complete_io(mdev, &peer_req->i);
2224
2225 out_interrupted:
2226         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2227         put_ldev(mdev);
2228         drbd_free_peer_req(mdev, peer_req);
2229         return err;
2230 }
2231
2232 /* We may throttle resync, if the lower device seems to be busy,
2233  * and current sync rate is above c_min_rate.
2234  *
2235  * To decide whether or not the lower device is busy, we use a scheme similar
2236  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2237  * (more than 64 sectors) of activity we cannot account for with our own resync
2238  * activity, it obviously is "busy".
2239  *
2240  * The current sync rate used here uses only the most recent two step marks,
2241  * to have a short time average so we can react faster.
2242  */
2243 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2244 {
2245         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2246         unsigned long db, dt, dbdt;
2247         struct lc_element *tmp;
2248         int curr_events;
2249         int throttle = 0;
2250         unsigned int c_min_rate;
2251
2252         rcu_read_lock();
2253         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2254         rcu_read_unlock();
2255
2256         /* feature disabled? */
2257         if (c_min_rate == 0)
2258                 return 0;
2259
2260         spin_lock_irq(&mdev->al_lock);
2261         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2262         if (tmp) {
2263                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2264                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2265                         spin_unlock_irq(&mdev->al_lock);
2266                         return 0;
2267                 }
2268                 /* Do not slow down if app IO is already waiting for this extent */
2269         }
2270         spin_unlock_irq(&mdev->al_lock);
2271
2272         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2273                       (int)part_stat_read(&disk->part0, sectors[1]) -
2274                         atomic_read(&mdev->rs_sect_ev);
2275
2276         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2277                 unsigned long rs_left;
2278                 int i;
2279
2280                 mdev->rs_last_events = curr_events;
2281
2282                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2283                  * approx. */
2284                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2285
2286                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2287                         rs_left = mdev->ov_left;
2288                 else
2289                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2290
2291                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2292                 if (!dt)
2293                         dt++;
2294                 db = mdev->rs_mark_left[i] - rs_left;
2295                 dbdt = Bit2KB(db/dt);
2296
2297                 if (dbdt > c_min_rate)
2298                         throttle = 1;
2299         }
2300         return throttle;
2301 }
2302
2303
2304 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2305 {
2306         struct drbd_conf *mdev;
2307         sector_t sector;
2308         sector_t capacity;
2309         struct drbd_peer_request *peer_req;
2310         struct digest_info *di = NULL;
2311         int size, verb;
2312         unsigned int fault_type;
2313         struct p_block_req *p = pi->data;
2314
2315         mdev = vnr_to_mdev(tconn, pi->vnr);
2316         if (!mdev)
2317                 return -EIO;
2318         capacity = drbd_get_capacity(mdev->this_bdev);
2319
2320         sector = be64_to_cpu(p->sector);
2321         size   = be32_to_cpu(p->blksize);
2322
2323         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2324                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2325                                 (unsigned long long)sector, size);
2326                 return -EINVAL;
2327         }
2328         if (sector + (size>>9) > capacity) {
2329                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2330                                 (unsigned long long)sector, size);
2331                 return -EINVAL;
2332         }
2333
2334         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2335                 verb = 1;
2336                 switch (pi->cmd) {
2337                 case P_DATA_REQUEST:
2338                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2339                         break;
2340                 case P_RS_DATA_REQUEST:
2341                 case P_CSUM_RS_REQUEST:
2342                 case P_OV_REQUEST:
2343                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2344                         break;
2345                 case P_OV_REPLY:
2346                         verb = 0;
2347                         dec_rs_pending(mdev);
2348                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2349                         break;
2350                 default:
2351                         BUG();
2352                 }
2353                 if (verb && __ratelimit(&drbd_ratelimit_state))
2354                         dev_err(DEV, "Can not satisfy peer's read request, "
2355                             "no local data.\n");
2356
2357                 /* drain possibly payload */
2358                 return drbd_drain_block(mdev, pi->size);
2359         }
2360
2361         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2362          * "criss-cross" setup, that might cause write-out on some other DRBD,
2363          * which in turn might block on the other node at this very place.  */
2364         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2365         if (!peer_req) {
2366                 put_ldev(mdev);
2367                 return -ENOMEM;
2368         }
2369
2370         switch (pi->cmd) {
2371         case P_DATA_REQUEST:
2372                 peer_req->w.cb = w_e_end_data_req;
2373                 fault_type = DRBD_FAULT_DT_RD;
2374                 /* application IO, don't drbd_rs_begin_io */
2375                 goto submit;
2376
2377         case P_RS_DATA_REQUEST:
2378                 peer_req->w.cb = w_e_end_rsdata_req;
2379                 fault_type = DRBD_FAULT_RS_RD;
2380                 /* used in the sector offset progress display */
2381                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2382                 break;
2383
2384         case P_OV_REPLY:
2385         case P_CSUM_RS_REQUEST:
2386                 fault_type = DRBD_FAULT_RS_RD;
2387                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2388                 if (!di)
2389                         goto out_free_e;
2390
2391                 di->digest_size = pi->size;
2392                 di->digest = (((char *)di)+sizeof(struct digest_info));
2393
2394                 peer_req->digest = di;
2395                 peer_req->flags |= EE_HAS_DIGEST;
2396
2397                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2398                         goto out_free_e;
2399
2400                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2401                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2402                         peer_req->w.cb = w_e_end_csum_rs_req;
2403                         /* used in the sector offset progress display */
2404                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2405                 } else if (pi->cmd == P_OV_REPLY) {
2406                         /* track progress, we may need to throttle */
2407                         atomic_add(size >> 9, &mdev->rs_sect_in);
2408                         peer_req->w.cb = w_e_end_ov_reply;
2409                         dec_rs_pending(mdev);
2410                         /* drbd_rs_begin_io done when we sent this request,
2411                          * but accounting still needs to be done. */
2412                         goto submit_for_resync;
2413                 }
2414                 break;
2415
2416         case P_OV_REQUEST:
2417                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2418                     mdev->tconn->agreed_pro_version >= 90) {
2419                         unsigned long now = jiffies;
2420                         int i;
2421                         mdev->ov_start_sector = sector;
2422                         mdev->ov_position = sector;
2423                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2424                         mdev->rs_total = mdev->ov_left;
2425                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2426                                 mdev->rs_mark_left[i] = mdev->ov_left;
2427                                 mdev->rs_mark_time[i] = now;
2428                         }
2429                         dev_info(DEV, "Online Verify start sector: %llu\n",
2430                                         (unsigned long long)sector);
2431                 }
2432                 peer_req->w.cb = w_e_end_ov_req;
2433                 fault_type = DRBD_FAULT_RS_RD;
2434                 break;
2435
2436         default:
2437                 BUG();
2438         }
2439
2440         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2441          * wrt the receiver, but it is not as straightforward as it may seem.
2442          * Various places in the resync start and stop logic assume resync
2443          * requests are processed in order, requeuing this on the worker thread
2444          * introduces a bunch of new code for synchronization between threads.
2445          *
2446          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2447          * "forever", throttling after drbd_rs_begin_io will lock that extent
2448          * for application writes for the same time.  For now, just throttle
2449          * here, where the rest of the code expects the receiver to sleep for
2450          * a while, anyways.
2451          */
2452
2453         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2454          * this defers syncer requests for some time, before letting at least
2455          * on request through.  The resync controller on the receiving side
2456          * will adapt to the incoming rate accordingly.
2457          *
2458          * We cannot throttle here if remote is Primary/SyncTarget:
2459          * we would also throttle its application reads.
2460          * In that case, throttling is done on the SyncTarget only.
2461          */
2462         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2463                 schedule_timeout_uninterruptible(HZ/10);
2464         if (drbd_rs_begin_io(mdev, sector))
2465                 goto out_free_e;
2466
2467 submit_for_resync:
2468         atomic_add(size >> 9, &mdev->rs_sect_ev);
2469
2470 submit:
2471         inc_unacked(mdev);
2472         spin_lock_irq(&mdev->tconn->req_lock);
2473         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2474         spin_unlock_irq(&mdev->tconn->req_lock);
2475
2476         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2477                 return 0;
2478
2479         /* don't care for the reason here */
2480         dev_err(DEV, "submit failed, triggering re-connect\n");
2481         spin_lock_irq(&mdev->tconn->req_lock);
2482         list_del(&peer_req->w.list);
2483         spin_unlock_irq(&mdev->tconn->req_lock);
2484         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2485
2486 out_free_e:
2487         put_ldev(mdev);
2488         drbd_free_peer_req(mdev, peer_req);
2489         return -EIO;
2490 }
2491
2492 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2493 {
2494         int self, peer, rv = -100;
2495         unsigned long ch_self, ch_peer;
2496         enum drbd_after_sb_p after_sb_0p;
2497
2498         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2499         peer = mdev->p_uuid[UI_BITMAP] & 1;
2500
2501         ch_peer = mdev->p_uuid[UI_SIZE];
2502         ch_self = mdev->comm_bm_set;
2503
2504         rcu_read_lock();
2505         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2506         rcu_read_unlock();
2507         switch (after_sb_0p) {
2508         case ASB_CONSENSUS:
2509         case ASB_DISCARD_SECONDARY:
2510         case ASB_CALL_HELPER:
2511         case ASB_VIOLENTLY:
2512                 dev_err(DEV, "Configuration error.\n");
2513                 break;
2514         case ASB_DISCONNECT:
2515                 break;
2516         case ASB_DISCARD_YOUNGER_PRI:
2517                 if (self == 0 && peer == 1) {
2518                         rv = -1;
2519                         break;
2520                 }
2521                 if (self == 1 && peer == 0) {
2522                         rv =  1;
2523                         break;
2524                 }
2525                 /* Else fall through to one of the other strategies... */
2526         case ASB_DISCARD_OLDER_PRI:
2527                 if (self == 0 && peer == 1) {
2528                         rv = 1;
2529                         break;
2530                 }
2531                 if (self == 1 && peer == 0) {
2532                         rv = -1;
2533                         break;
2534                 }
2535                 /* Else fall through to one of the other strategies... */
2536                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2537                      "Using discard-least-changes instead\n");
2538         case ASB_DISCARD_ZERO_CHG:
2539                 if (ch_peer == 0 && ch_self == 0) {
2540                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2541                                 ? -1 : 1;
2542                         break;
2543                 } else {
2544                         if (ch_peer == 0) { rv =  1; break; }
2545                         if (ch_self == 0) { rv = -1; break; }
2546                 }
2547                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2548                         break;
2549         case ASB_DISCARD_LEAST_CHG:
2550                 if      (ch_self < ch_peer)
2551                         rv = -1;
2552                 else if (ch_self > ch_peer)
2553                         rv =  1;
2554                 else /* ( ch_self == ch_peer ) */
2555                      /* Well, then use something else. */
2556                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2557                                 ? -1 : 1;
2558                 break;
2559         case ASB_DISCARD_LOCAL:
2560                 rv = -1;
2561                 break;
2562         case ASB_DISCARD_REMOTE:
2563                 rv =  1;
2564         }
2565
2566         return rv;
2567 }
2568
2569 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2570 {
2571         int hg, rv = -100;
2572         enum drbd_after_sb_p after_sb_1p;
2573
2574         rcu_read_lock();
2575         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2576         rcu_read_unlock();
2577         switch (after_sb_1p) {
2578         case ASB_DISCARD_YOUNGER_PRI:
2579         case ASB_DISCARD_OLDER_PRI:
2580         case ASB_DISCARD_LEAST_CHG:
2581         case ASB_DISCARD_LOCAL:
2582         case ASB_DISCARD_REMOTE:
2583         case ASB_DISCARD_ZERO_CHG:
2584                 dev_err(DEV, "Configuration error.\n");
2585                 break;
2586         case ASB_DISCONNECT:
2587                 break;
2588         case ASB_CONSENSUS:
2589                 hg = drbd_asb_recover_0p(mdev);
2590                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2591                         rv = hg;
2592                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2593                         rv = hg;
2594                 break;
2595         case ASB_VIOLENTLY:
2596                 rv = drbd_asb_recover_0p(mdev);
2597                 break;
2598         case ASB_DISCARD_SECONDARY:
2599                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2600         case ASB_CALL_HELPER:
2601                 hg = drbd_asb_recover_0p(mdev);
2602                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2603                         enum drbd_state_rv rv2;
2604
2605                         drbd_set_role(mdev, R_SECONDARY, 0);
2606                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2607                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2608                           * we do not need to wait for the after state change work either. */
2609                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2610                         if (rv2 != SS_SUCCESS) {
2611                                 drbd_khelper(mdev, "pri-lost-after-sb");
2612                         } else {
2613                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2614                                 rv = hg;
2615                         }
2616                 } else
2617                         rv = hg;
2618         }
2619
2620         return rv;
2621 }
2622
2623 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2624 {
2625         int hg, rv = -100;
2626         enum drbd_after_sb_p after_sb_2p;
2627
2628         rcu_read_lock();
2629         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2630         rcu_read_unlock();
2631         switch (after_sb_2p) {
2632         case ASB_DISCARD_YOUNGER_PRI:
2633         case ASB_DISCARD_OLDER_PRI:
2634         case ASB_DISCARD_LEAST_CHG:
2635         case ASB_DISCARD_LOCAL:
2636         case ASB_DISCARD_REMOTE:
2637         case ASB_CONSENSUS:
2638         case ASB_DISCARD_SECONDARY:
2639         case ASB_DISCARD_ZERO_CHG:
2640                 dev_err(DEV, "Configuration error.\n");
2641                 break;
2642         case ASB_VIOLENTLY:
2643                 rv = drbd_asb_recover_0p(mdev);
2644                 break;
2645         case ASB_DISCONNECT:
2646                 break;
2647         case ASB_CALL_HELPER:
2648                 hg = drbd_asb_recover_0p(mdev);
2649                 if (hg == -1) {
2650                         enum drbd_state_rv rv2;
2651
2652                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2653                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2654                           * we do not need to wait for the after state change work either. */
2655                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2656                         if (rv2 != SS_SUCCESS) {
2657                                 drbd_khelper(mdev, "pri-lost-after-sb");
2658                         } else {
2659                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2660                                 rv = hg;
2661                         }
2662                 } else
2663                         rv = hg;
2664         }
2665
2666         return rv;
2667 }
2668
2669 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2670                            u64 bits, u64 flags)
2671 {
2672         if (!uuid) {
2673                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2674                 return;
2675         }
2676         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2677              text,
2678              (unsigned long long)uuid[UI_CURRENT],
2679              (unsigned long long)uuid[UI_BITMAP],
2680              (unsigned long long)uuid[UI_HISTORY_START],
2681              (unsigned long long)uuid[UI_HISTORY_END],
2682              (unsigned long long)bits,
2683              (unsigned long long)flags);
2684 }
2685
2686 /*
2687   100   after split brain try auto recover
2688     2   C_SYNC_SOURCE set BitMap
2689     1   C_SYNC_SOURCE use BitMap
2690     0   no Sync
2691    -1   C_SYNC_TARGET use BitMap
2692    -2   C_SYNC_TARGET set BitMap
2693  -100   after split brain, disconnect
2694 -1000   unrelated data
2695 -1091   requires proto 91
2696 -1096   requires proto 96
2697  */
2698 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2699 {
2700         u64 self, peer;
2701         int i, j;
2702
2703         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2704         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2705
2706         *rule_nr = 10;
2707         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2708                 return 0;
2709
2710         *rule_nr = 20;
2711         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2712              peer != UUID_JUST_CREATED)
2713                 return -2;
2714
2715         *rule_nr = 30;
2716         if (self != UUID_JUST_CREATED &&
2717             (peer == UUID_JUST_CREATED || peer == (u64)0))
2718                 return 2;
2719
2720         if (self == peer) {
2721                 int rct, dc; /* roles at crash time */
2722
2723                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2724
2725                         if (mdev->tconn->agreed_pro_version < 91)
2726                                 return -1091;
2727
2728                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2729                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2730                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2731                                 drbd_uuid_set_bm(mdev, 0UL);
2732
2733                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2734                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2735                                 *rule_nr = 34;
2736                         } else {
2737                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2738                                 *rule_nr = 36;
2739                         }
2740
2741                         return 1;
2742                 }
2743
2744                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2745
2746                         if (mdev->tconn->agreed_pro_version < 91)
2747                                 return -1091;
2748
2749                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2750                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2751                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2752
2753                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2754                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2755                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2756
2757                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2758                                 *rule_nr = 35;
2759                         } else {
2760                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2761                                 *rule_nr = 37;
2762                         }
2763
2764                         return -1;
2765                 }
2766
2767                 /* Common power [off|failure] */
2768                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2769                         (mdev->p_uuid[UI_FLAGS] & 2);
2770                 /* lowest bit is set when we were primary,
2771                  * next bit (weight 2) is set when peer was primary */
2772                 *rule_nr = 40;
2773
2774                 switch (rct) {
2775                 case 0: /* !self_pri && !peer_pri */ return 0;
2776                 case 1: /*  self_pri && !peer_pri */ return 1;
2777                 case 2: /* !self_pri &&  peer_pri */ return -1;
2778                 case 3: /*  self_pri &&  peer_pri */
2779                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2780                         return dc ? -1 : 1;
2781                 }
2782         }
2783
2784         *rule_nr = 50;
2785         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2786         if (self == peer)
2787                 return -1;
2788
2789         *rule_nr = 51;
2790         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2791         if (self == peer) {
2792                 if (mdev->tconn->agreed_pro_version < 96 ?
2793                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2794                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2795                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2796                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2797                            resync as sync source modifications of the peer's UUIDs. */
2798
2799                         if (mdev->tconn->agreed_pro_version < 91)
2800                                 return -1091;
2801
2802                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2803                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2804
2805                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2806                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2807
2808                         return -1;
2809                 }
2810         }
2811
2812         *rule_nr = 60;
2813         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2814         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2815                 peer = mdev->p_uuid[i] & ~((u64)1);
2816                 if (self == peer)
2817                         return -2;
2818         }
2819
2820         *rule_nr = 70;
2821         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2822         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2823         if (self == peer)
2824                 return 1;
2825
2826         *rule_nr = 71;
2827         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2828         if (self == peer) {
2829                 if (mdev->tconn->agreed_pro_version < 96 ?
2830                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2831                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2832                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2833                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2834                            resync as sync source modifications of our UUIDs. */
2835
2836                         if (mdev->tconn->agreed_pro_version < 91)
2837                                 return -1091;
2838
2839                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2840                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2841
2842                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2843                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2844                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2845
2846                         return 1;
2847                 }
2848         }
2849
2850
2851         *rule_nr = 80;
2852         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2853         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2854                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2855                 if (self == peer)
2856                         return 2;
2857         }
2858
2859         *rule_nr = 90;
2860         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2861         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2862         if (self == peer && self != ((u64)0))
2863                 return 100;
2864
2865         *rule_nr = 100;
2866         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2867                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2868                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2869                         peer = mdev->p_uuid[j] & ~((u64)1);
2870                         if (self == peer)
2871                                 return -100;
2872                 }
2873         }
2874
2875         return -1000;
2876 }
2877
2878 /* drbd_sync_handshake() returns the new conn state on success, or
2879    CONN_MASK (-1) on failure.
2880  */
2881 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2882                                            enum drbd_disk_state peer_disk) __must_hold(local)
2883 {
2884         enum drbd_conns rv = C_MASK;
2885         enum drbd_disk_state mydisk;
2886         struct net_conf *nc;
2887         int hg, rule_nr, rr_conflict, tentative;
2888
2889         mydisk = mdev->state.disk;
2890         if (mydisk == D_NEGOTIATING)
2891                 mydisk = mdev->new_state_tmp.disk;
2892
2893         dev_info(DEV, "drbd_sync_handshake:\n");
2894         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2895         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2896                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2897
2898         hg = drbd_uuid_compare(mdev, &rule_nr);
2899
2900         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2901
2902         if (hg == -1000) {
2903                 dev_alert(DEV, "Unrelated data, aborting!\n");
2904                 return C_MASK;
2905         }
2906         if (hg < -1000) {
2907                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2908                 return C_MASK;
2909         }
2910
2911         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2912             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2913                 int f = (hg == -100) || abs(hg) == 2;
2914                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2915                 if (f)
2916                         hg = hg*2;
2917                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2918                      hg > 0 ? "source" : "target");
2919         }
2920
2921         if (abs(hg) == 100)
2922                 drbd_khelper(mdev, "initial-split-brain");
2923
2924         rcu_read_lock();
2925         nc = rcu_dereference(mdev->tconn->net_conf);
2926
2927         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2928                 int pcount = (mdev->state.role == R_PRIMARY)
2929                            + (peer_role == R_PRIMARY);
2930                 int forced = (hg == -100);
2931
2932                 switch (pcount) {
2933                 case 0:
2934                         hg = drbd_asb_recover_0p(mdev);
2935                         break;
2936                 case 1:
2937                         hg = drbd_asb_recover_1p(mdev);
2938                         break;
2939                 case 2:
2940                         hg = drbd_asb_recover_2p(mdev);
2941                         break;
2942                 }
2943                 if (abs(hg) < 100) {
2944                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2945                              "automatically solved. Sync from %s node\n",
2946                              pcount, (hg < 0) ? "peer" : "this");
2947                         if (forced) {
2948                                 dev_warn(DEV, "Doing a full sync, since"
2949                                      " UUIDs where ambiguous.\n");
2950                                 hg = hg*2;
2951                         }
2952                 }
2953         }
2954
2955         if (hg == -100) {
2956                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
2957                         hg = -1;
2958                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
2959                         hg = 1;
2960
2961                 if (abs(hg) < 100)
2962                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2963                              "Sync from %s node\n",
2964                              (hg < 0) ? "peer" : "this");
2965         }
2966         rr_conflict = nc->rr_conflict;
2967         tentative = nc->tentative;
2968         rcu_read_unlock();
2969
2970         if (hg == -100) {
2971                 /* FIXME this log message is not correct if we end up here
2972                  * after an attempted attach on a diskless node.
2973                  * We just refuse to attach -- well, we drop the "connection"
2974                  * to that disk, in a way... */
2975                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2976                 drbd_khelper(mdev, "split-brain");
2977                 return C_MASK;
2978         }
2979
2980         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2981                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2982                 return C_MASK;
2983         }
2984
2985         if (hg < 0 && /* by intention we do not use mydisk here. */
2986             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2987                 switch (rr_conflict) {
2988                 case ASB_CALL_HELPER:
2989                         drbd_khelper(mdev, "pri-lost");
2990                         /* fall through */
2991                 case ASB_DISCONNECT:
2992                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2993                         return C_MASK;
2994                 case ASB_VIOLENTLY:
2995                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2996                              "assumption\n");
2997                 }
2998         }
2999
3000         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3001                 if (hg == 0)
3002                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3003                 else
3004                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3005                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3006                                  abs(hg) >= 2 ? "full" : "bit-map based");
3007                 return C_MASK;
3008         }
3009
3010         if (abs(hg) >= 2) {
3011                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3012                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3013                                         BM_LOCKED_SET_ALLOWED))
3014                         return C_MASK;
3015         }
3016
3017         if (hg > 0) { /* become sync source. */
3018                 rv = C_WF_BITMAP_S;
3019         } else if (hg < 0) { /* become sync target */
3020                 rv = C_WF_BITMAP_T;
3021         } else {
3022                 rv = C_CONNECTED;
3023                 if (drbd_bm_total_weight(mdev)) {
3024                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3025                              drbd_bm_total_weight(mdev));
3026                 }
3027         }
3028
3029         return rv;
3030 }
3031
3032 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3033 {
3034         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3035         if (peer == ASB_DISCARD_REMOTE)
3036                 return ASB_DISCARD_LOCAL;
3037
3038         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3039         if (peer == ASB_DISCARD_LOCAL)
3040                 return ASB_DISCARD_REMOTE;
3041
3042         /* everything else is valid if they are equal on both sides. */
3043         return peer;
3044 }
3045
3046 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3047 {
3048         struct p_protocol *p = pi->data;
3049         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3050         int p_proto, p_discard_my_data, p_two_primaries, cf;
3051         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3052         char integrity_alg[SHARED_SECRET_MAX] = "";
3053         struct crypto_hash *peer_integrity_tfm = NULL;
3054         void *int_dig_in = NULL, *int_dig_vv = NULL;
3055
3056         p_proto         = be32_to_cpu(p->protocol);
3057         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3058         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3059         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3060         p_two_primaries = be32_to_cpu(p->two_primaries);
3061         cf              = be32_to_cpu(p->conn_flags);
3062         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3063
3064         if (tconn->agreed_pro_version >= 87) {
3065                 int err;
3066
3067                 if (pi->size > sizeof(integrity_alg))
3068                         return -EIO;
3069                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3070                 if (err)
3071                         return err;
3072                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3073         }
3074
3075         if (pi->cmd != P_PROTOCOL_UPDATE) {
3076                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3077
3078                 if (cf & CF_DRY_RUN)
3079                         set_bit(CONN_DRY_RUN, &tconn->flags);
3080
3081                 rcu_read_lock();
3082                 nc = rcu_dereference(tconn->net_conf);
3083
3084                 if (p_proto != nc->wire_protocol) {
3085                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3086                         goto disconnect_rcu_unlock;
3087                 }
3088
3089                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3090                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3091                         goto disconnect_rcu_unlock;
3092                 }
3093
3094                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3095                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3096                         goto disconnect_rcu_unlock;
3097                 }
3098
3099                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3100                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3101                         goto disconnect_rcu_unlock;
3102                 }
3103
3104                 if (p_discard_my_data && nc->discard_my_data) {
3105                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3106                         goto disconnect_rcu_unlock;
3107                 }
3108
3109                 if (p_two_primaries != nc->two_primaries) {
3110                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3111                         goto disconnect_rcu_unlock;
3112                 }
3113
3114                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3115                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3116                         goto disconnect_rcu_unlock;
3117                 }
3118
3119                 rcu_read_unlock();
3120         }
3121
3122         if (integrity_alg[0]) {
3123                 int hash_size;
3124
3125                 /*
3126                  * We can only change the peer data integrity algorithm
3127                  * here.  Changing our own data integrity algorithm
3128                  * requires that we send a P_PROTOCOL_UPDATE packet at
3129                  * the same time; otherwise, the peer has no way to
3130                  * tell between which packets the algorithm should
3131                  * change.
3132                  */
3133
3134                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3135                 if (!peer_integrity_tfm) {
3136                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3137                                  integrity_alg);
3138                         goto disconnect;
3139                 }
3140
3141                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3142                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3143                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3144                 if (!(int_dig_in && int_dig_vv)) {
3145                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3146                         goto disconnect;
3147                 }
3148         }
3149
3150         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3151         if (!new_net_conf) {
3152                 conn_err(tconn, "Allocation of new net_conf failed\n");
3153                 goto disconnect;
3154         }
3155
3156         mutex_lock(&tconn->data.mutex);
3157         mutex_lock(&tconn->conf_update);
3158         old_net_conf = tconn->net_conf;
3159         *new_net_conf = *old_net_conf;
3160
3161         new_net_conf->wire_protocol = p_proto;
3162         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3163         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3164         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3165         new_net_conf->two_primaries = p_two_primaries;
3166
3167         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3168         mutex_unlock(&tconn->conf_update);
3169         mutex_unlock(&tconn->data.mutex);
3170
3171         crypto_free_hash(tconn->peer_integrity_tfm);
3172         kfree(tconn->int_dig_in);
3173         kfree(tconn->int_dig_vv);
3174         tconn->peer_integrity_tfm = peer_integrity_tfm;
3175         tconn->int_dig_in = int_dig_in;
3176         tconn->int_dig_vv = int_dig_vv;
3177
3178         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3179                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3180                           integrity_alg[0] ? integrity_alg : "(none)");
3181
3182         synchronize_rcu();
3183         kfree(old_net_conf);
3184         return 0;
3185
3186 disconnect_rcu_unlock:
3187         rcu_read_unlock();
3188 disconnect:
3189         crypto_free_hash(peer_integrity_tfm);
3190         kfree(int_dig_in);
3191         kfree(int_dig_vv);
3192         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3193         return -EIO;
3194 }
3195
3196 /* helper function
3197  * input: alg name, feature name
3198  * return: NULL (alg name was "")
3199  *         ERR_PTR(error) if something goes wrong
3200  *         or the crypto hash ptr, if it worked out ok. */
3201 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3202                 const char *alg, const char *name)
3203 {
3204         struct crypto_hash *tfm;
3205
3206         if (!alg[0])
3207                 return NULL;
3208
3209         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3210         if (IS_ERR(tfm)) {
3211                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3212                         alg, name, PTR_ERR(tfm));
3213                 return tfm;
3214         }
3215         return tfm;
3216 }
3217
3218 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3219 {
3220         void *buffer = tconn->data.rbuf;
3221         int size = pi->size;
3222
3223         while (size) {
3224                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3225                 s = drbd_recv(tconn, buffer, s);
3226                 if (s <= 0) {
3227                         if (s < 0)
3228                                 return s;
3229                         break;
3230                 }
3231                 size -= s;
3232         }
3233         if (size)
3234                 return -EIO;
3235         return 0;
3236 }
3237
3238 /*
3239  * config_unknown_volume  -  device configuration command for unknown volume
3240  *
3241  * When a device is added to an existing connection, the node on which the
3242  * device is added first will send configuration commands to its peer but the
3243  * peer will not know about the device yet.  It will warn and ignore these
3244  * commands.  Once the device is added on the second node, the second node will
3245  * send the same device configuration commands, but in the other direction.
3246  *
3247  * (We can also end up here if drbd is misconfigured.)
3248  */
3249 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3250 {
3251         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3252                   cmdname(pi->cmd), pi->vnr);
3253         return ignore_remaining_packet(tconn, pi);
3254 }
3255
3256 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3257 {
3258         struct drbd_conf *mdev;
3259         struct p_rs_param_95 *p;
3260         unsigned int header_size, data_size, exp_max_sz;
3261         struct crypto_hash *verify_tfm = NULL;
3262         struct crypto_hash *csums_tfm = NULL;
3263         struct net_conf *old_net_conf, *new_net_conf = NULL;
3264         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3265         const int apv = tconn->agreed_pro_version;
3266         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3267         int fifo_size = 0;
3268         int err;
3269
3270         mdev = vnr_to_mdev(tconn, pi->vnr);
3271         if (!mdev)
3272                 return config_unknown_volume(tconn, pi);
3273
3274         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3275                     : apv == 88 ? sizeof(struct p_rs_param)
3276                                         + SHARED_SECRET_MAX
3277                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3278                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3279
3280         if (pi->size > exp_max_sz) {
3281                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3282                     pi->size, exp_max_sz);
3283                 return -EIO;
3284         }
3285
3286         if (apv <= 88) {
3287                 header_size = sizeof(struct p_rs_param);
3288                 data_size = pi->size - header_size;
3289         } else if (apv <= 94) {
3290                 header_size = sizeof(struct p_rs_param_89);
3291                 data_size = pi->size - header_size;
3292                 D_ASSERT(data_size == 0);
3293         } else {
3294                 header_size = sizeof(struct p_rs_param_95);
3295                 data_size = pi->size - header_size;
3296                 D_ASSERT(data_size == 0);
3297         }
3298
3299         /* initialize verify_alg and csums_alg */
3300         p = pi->data;
3301         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3302
3303         err = drbd_recv_all(mdev->tconn, p, header_size);
3304         if (err)
3305                 return err;
3306
3307         mutex_lock(&mdev->tconn->conf_update);
3308         old_net_conf = mdev->tconn->net_conf;
3309         if (get_ldev(mdev)) {
3310                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3311                 if (!new_disk_conf) {
3312                         put_ldev(mdev);
3313                         mutex_unlock(&mdev->tconn->conf_update);
3314                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3315                         return -ENOMEM;
3316                 }
3317
3318                 old_disk_conf = mdev->ldev->disk_conf;
3319                 *new_disk_conf = *old_disk_conf;
3320
3321                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3322         }
3323
3324         if (apv >= 88) {
3325                 if (apv == 88) {
3326                         if (data_size > SHARED_SECRET_MAX) {
3327                                 dev_err(DEV, "verify-alg too long, "
3328                                     "peer wants %u, accepting only %u byte\n",
3329                                                 data_size, SHARED_SECRET_MAX);
3330                                 err = -EIO;
3331                                 goto reconnect;
3332                         }
3333
3334                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3335                         if (err)
3336                                 goto reconnect;
3337                         /* we expect NUL terminated string */
3338                         /* but just in case someone tries to be evil */
3339                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3340                         p->verify_alg[data_size-1] = 0;
3341
3342                 } else /* apv >= 89 */ {
3343                         /* we still expect NUL terminated strings */
3344                         /* but just in case someone tries to be evil */
3345                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3346                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3347                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3348                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3349                 }
3350
3351                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3352                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3353                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3354                                     old_net_conf->verify_alg, p->verify_alg);
3355                                 goto disconnect;
3356                         }
3357                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3358                                         p->verify_alg, "verify-alg");
3359                         if (IS_ERR(verify_tfm)) {
3360                                 verify_tfm = NULL;
3361                                 goto disconnect;
3362                         }
3363                 }
3364
3365                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3366                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3367                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3368                                     old_net_conf->csums_alg, p->csums_alg);
3369                                 goto disconnect;
3370                         }
3371                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3372                                         p->csums_alg, "csums-alg");
3373                         if (IS_ERR(csums_tfm)) {
3374                                 csums_tfm = NULL;
3375                                 goto disconnect;
3376                         }
3377                 }
3378
3379                 if (apv > 94 && new_disk_conf) {
3380                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3381                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3382                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3383                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3384
3385                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3386                         if (fifo_size != mdev->rs_plan_s->size) {
3387                                 new_plan = fifo_alloc(fifo_size);
3388                                 if (!new_plan) {
3389                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3390                                         put_ldev(mdev);
3391                                         goto disconnect;
3392                                 }
3393                         }
3394                 }
3395
3396                 if (verify_tfm || csums_tfm) {
3397                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3398                         if (!new_net_conf) {
3399                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3400                                 goto disconnect;
3401                         }
3402
3403                         *new_net_conf = *old_net_conf;
3404
3405                         if (verify_tfm) {
3406                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3407                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3408                                 crypto_free_hash(mdev->tconn->verify_tfm);
3409                                 mdev->tconn->verify_tfm = verify_tfm;
3410                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3411                         }
3412                         if (csums_tfm) {
3413                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3414                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3415                                 crypto_free_hash(mdev->tconn->csums_tfm);
3416                                 mdev->tconn->csums_tfm = csums_tfm;
3417                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3418                         }
3419                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3420                 }
3421         }
3422
3423         if (new_disk_conf) {
3424                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3425                 put_ldev(mdev);
3426         }
3427
3428         if (new_plan) {
3429                 old_plan = mdev->rs_plan_s;
3430                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3431         }
3432
3433         mutex_unlock(&mdev->tconn->conf_update);
3434         synchronize_rcu();
3435         if (new_net_conf)
3436                 kfree(old_net_conf);
3437         kfree(old_disk_conf);
3438         kfree(old_plan);
3439
3440         return 0;
3441
3442 reconnect:
3443         if (new_disk_conf) {
3444                 put_ldev(mdev);
3445                 kfree(new_disk_conf);
3446         }
3447         mutex_unlock(&mdev->tconn->conf_update);
3448         return -EIO;
3449
3450 disconnect:
3451         kfree(new_plan);
3452         if (new_disk_conf) {
3453                 put_ldev(mdev);
3454                 kfree(new_disk_conf);
3455         }
3456         mutex_unlock(&mdev->tconn->conf_update);
3457         /* just for completeness: actually not needed,
3458          * as this is not reached if csums_tfm was ok. */
3459         crypto_free_hash(csums_tfm);
3460         /* but free the verify_tfm again, if csums_tfm did not work out */
3461         crypto_free_hash(verify_tfm);
3462         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3463         return -EIO;
3464 }
3465
3466 /* warn if the arguments differ by more than 12.5% */
3467 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3468         const char *s, sector_t a, sector_t b)
3469 {
3470         sector_t d;
3471         if (a == 0 || b == 0)
3472                 return;
3473         d = (a > b) ? (a - b) : (b - a);
3474         if (d > (a>>3) || d > (b>>3))
3475                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3476                      (unsigned long long)a, (unsigned long long)b);
3477 }
3478
3479 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3480 {
3481         struct drbd_conf *mdev;
3482         struct p_sizes *p = pi->data;
3483         enum determine_dev_size dd = unchanged;
3484         sector_t p_size, p_usize, my_usize;
3485         int ldsc = 0; /* local disk size changed */
3486         enum dds_flags ddsf;
3487
3488         mdev = vnr_to_mdev(tconn, pi->vnr);
3489         if (!mdev)
3490                 return config_unknown_volume(tconn, pi);
3491
3492         p_size = be64_to_cpu(p->d_size);
3493         p_usize = be64_to_cpu(p->u_size);
3494
3495         /* just store the peer's disk size for now.
3496          * we still need to figure out whether we accept that. */
3497         mdev->p_size = p_size;
3498
3499         if (get_ldev(mdev)) {
3500                 rcu_read_lock();
3501                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3502                 rcu_read_unlock();
3503
3504                 warn_if_differ_considerably(mdev, "lower level device sizes",
3505                            p_size, drbd_get_max_capacity(mdev->ldev));
3506                 warn_if_differ_considerably(mdev, "user requested size",
3507                                             p_usize, my_usize);
3508
3509                 /* if this is the first connect, or an otherwise expected
3510                  * param exchange, choose the minimum */
3511                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3512                         p_usize = min_not_zero(my_usize, p_usize);
3513
3514                 /* Never shrink a device with usable data during connect.
3515                    But allow online shrinking if we are connected. */
3516                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3517                     drbd_get_capacity(mdev->this_bdev) &&
3518                     mdev->state.disk >= D_OUTDATED &&
3519                     mdev->state.conn < C_CONNECTED) {
3520                         dev_err(DEV, "The peer's disk size is too small!\n");
3521                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3522                         put_ldev(mdev);
3523                         return -EIO;
3524                 }
3525
3526                 if (my_usize != p_usize) {
3527                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3528
3529                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3530                         if (!new_disk_conf) {
3531                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3532                                 put_ldev(mdev);
3533                                 return -ENOMEM;
3534                         }
3535
3536                         mutex_lock(&mdev->tconn->conf_update);
3537                         old_disk_conf = mdev->ldev->disk_conf;
3538                         *new_disk_conf = *old_disk_conf;
3539                         new_disk_conf->disk_size = p_usize;
3540
3541                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3542                         mutex_unlock(&mdev->tconn->conf_update);
3543                         synchronize_rcu();
3544                         kfree(old_disk_conf);
3545
3546                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3547                                  (unsigned long)my_usize);
3548                 }
3549
3550                 put_ldev(mdev);
3551         }
3552
3553         ddsf = be16_to_cpu(p->dds_flags);
3554         if (get_ldev(mdev)) {
3555                 dd = drbd_determine_dev_size(mdev, ddsf);
3556                 put_ldev(mdev);
3557                 if (dd == dev_size_error)
3558                         return -EIO;
3559                 drbd_md_sync(mdev);
3560         } else {
3561                 /* I am diskless, need to accept the peer's size. */
3562                 drbd_set_my_capacity(mdev, p_size);
3563         }
3564
3565         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3566         drbd_reconsider_max_bio_size(mdev);
3567
3568         if (get_ldev(mdev)) {
3569                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3570                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3571                         ldsc = 1;
3572                 }
3573
3574                 put_ldev(mdev);
3575         }
3576
3577         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3578                 if (be64_to_cpu(p->c_size) !=
3579                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3580                         /* we have different sizes, probably peer
3581                          * needs to know my new size... */
3582                         drbd_send_sizes(mdev, 0, ddsf);
3583                 }
3584                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3585                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3586                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3587                             mdev->state.disk >= D_INCONSISTENT) {
3588                                 if (ddsf & DDSF_NO_RESYNC)
3589                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3590                                 else
3591                                         resync_after_online_grow(mdev);
3592                         } else
3593                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3594                 }
3595         }
3596
3597         return 0;
3598 }
3599
3600 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3601 {
3602         struct drbd_conf *mdev;
3603         struct p_uuids *p = pi->data;
3604         u64 *p_uuid;
3605         int i, updated_uuids = 0;
3606
3607         mdev = vnr_to_mdev(tconn, pi->vnr);
3608         if (!mdev)
3609                 return config_unknown_volume(tconn, pi);
3610
3611         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3612
3613         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3614                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3615
3616         kfree(mdev->p_uuid);
3617         mdev->p_uuid = p_uuid;
3618
3619         if (mdev->state.conn < C_CONNECTED &&
3620             mdev->state.disk < D_INCONSISTENT &&
3621             mdev->state.role == R_PRIMARY &&
3622             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3623                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3624                     (unsigned long long)mdev->ed_uuid);
3625                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3626                 return -EIO;
3627         }
3628
3629         if (get_ldev(mdev)) {
3630                 int skip_initial_sync =
3631                         mdev->state.conn == C_CONNECTED &&
3632                         mdev->tconn->agreed_pro_version >= 90 &&
3633                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3634                         (p_uuid[UI_FLAGS] & 8);
3635                 if (skip_initial_sync) {
3636                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3637                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3638                                         "clear_n_write from receive_uuids",
3639                                         BM_LOCKED_TEST_ALLOWED);
3640                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3641                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3642                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3643                                         CS_VERBOSE, NULL);
3644                         drbd_md_sync(mdev);
3645                         updated_uuids = 1;
3646                 }
3647                 put_ldev(mdev);
3648         } else if (mdev->state.disk < D_INCONSISTENT &&
3649                    mdev->state.role == R_PRIMARY) {
3650                 /* I am a diskless primary, the peer just created a new current UUID
3651                    for me. */
3652                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3653         }
3654
3655         /* Before we test for the disk state, we should wait until an eventually
3656            ongoing cluster wide state change is finished. That is important if
3657            we are primary and are detaching from our disk. We need to see the
3658            new disk state... */
3659         mutex_lock(mdev->state_mutex);
3660         mutex_unlock(mdev->state_mutex);
3661         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3662                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3663
3664         if (updated_uuids)
3665                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3666
3667         return 0;
3668 }
3669
3670 /**
3671  * convert_state() - Converts the peer's view of the cluster state to our point of view
3672  * @ps:         The state as seen by the peer.
3673  */
3674 static union drbd_state convert_state(union drbd_state ps)
3675 {
3676         union drbd_state ms;
3677
3678         static enum drbd_conns c_tab[] = {
3679                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3680                 [C_CONNECTED] = C_CONNECTED,
3681
3682                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3683                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3684                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3685                 [C_VERIFY_S]       = C_VERIFY_T,
3686                 [C_MASK]   = C_MASK,
3687         };
3688
3689         ms.i = ps.i;
3690
3691         ms.conn = c_tab[ps.conn];
3692         ms.peer = ps.role;
3693         ms.role = ps.peer;
3694         ms.pdsk = ps.disk;
3695         ms.disk = ps.pdsk;
3696         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3697
3698         return ms;
3699 }
3700
3701 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3702 {
3703         struct drbd_conf *mdev;
3704         struct p_req_state *p = pi->data;
3705         union drbd_state mask, val;
3706         enum drbd_state_rv rv;
3707
3708         mdev = vnr_to_mdev(tconn, pi->vnr);
3709         if (!mdev)
3710                 return -EIO;
3711
3712         mask.i = be32_to_cpu(p->mask);
3713         val.i = be32_to_cpu(p->val);
3714
3715         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3716             mutex_is_locked(mdev->state_mutex)) {
3717                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3718                 return 0;
3719         }
3720
3721         mask = convert_state(mask);
3722         val = convert_state(val);
3723
3724         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3725         drbd_send_sr_reply(mdev, rv);
3726
3727         drbd_md_sync(mdev);
3728
3729         return 0;
3730 }
3731
3732 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3733 {
3734         struct p_req_state *p = pi->data;
3735         union drbd_state mask, val;
3736         enum drbd_state_rv rv;
3737
3738         mask.i = be32_to_cpu(p->mask);
3739         val.i = be32_to_cpu(p->val);
3740
3741         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3742             mutex_is_locked(&tconn->cstate_mutex)) {
3743                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3744                 return 0;
3745         }
3746
3747         mask = convert_state(mask);
3748         val = convert_state(val);
3749
3750         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3751         conn_send_sr_reply(tconn, rv);
3752
3753         return 0;
3754 }
3755
3756 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3757 {
3758         struct drbd_conf *mdev;
3759         struct p_state *p = pi->data;
3760         union drbd_state os, ns, peer_state;
3761         enum drbd_disk_state real_peer_disk;
3762         enum chg_state_flags cs_flags;
3763         int rv;
3764
3765         mdev = vnr_to_mdev(tconn, pi->vnr);
3766         if (!mdev)
3767                 return config_unknown_volume(tconn, pi);
3768
3769         peer_state.i = be32_to_cpu(p->state);
3770
3771         real_peer_disk = peer_state.disk;
3772         if (peer_state.disk == D_NEGOTIATING) {
3773                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3774                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3775         }
3776
3777         spin_lock_irq(&mdev->tconn->req_lock);
3778  retry:
3779         os = ns = drbd_read_state(mdev);
3780         spin_unlock_irq(&mdev->tconn->req_lock);
3781
3782         /* If this is the "end of sync" confirmation, usually the peer disk
3783          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3784          * set) resync started in PausedSyncT, or if the timing of pause-/
3785          * unpause-sync events has been "just right", the peer disk may
3786          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3787          */
3788         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3789             real_peer_disk == D_UP_TO_DATE &&
3790             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3791                 /* If we are (becoming) SyncSource, but peer is still in sync
3792                  * preparation, ignore its uptodate-ness to avoid flapping, it
3793                  * will change to inconsistent once the peer reaches active
3794                  * syncing states.
3795                  * It may have changed syncer-paused flags, however, so we
3796                  * cannot ignore this completely. */
3797                 if (peer_state.conn > C_CONNECTED &&
3798                     peer_state.conn < C_SYNC_SOURCE)
3799                         real_peer_disk = D_INCONSISTENT;
3800
3801                 /* if peer_state changes to connected at the same time,
3802                  * it explicitly notifies us that it finished resync.
3803                  * Maybe we should finish it up, too? */
3804                 else if (os.conn >= C_SYNC_SOURCE &&
3805                          peer_state.conn == C_CONNECTED) {
3806                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3807                                 drbd_resync_finished(mdev);
3808                         return 0;
3809                 }
3810         }
3811
3812         /* peer says his disk is inconsistent, while we think it is uptodate,
3813          * and this happens while the peer still thinks we have a sync going on,
3814          * but we think we are already done with the sync.
3815          * We ignore this to avoid flapping pdsk.
3816          * This should not happen, if the peer is a recent version of drbd. */
3817         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3818             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3819                 real_peer_disk = D_UP_TO_DATE;
3820
3821         if (ns.conn == C_WF_REPORT_PARAMS)
3822                 ns.conn = C_CONNECTED;
3823
3824         if (peer_state.conn == C_AHEAD)
3825                 ns.conn = C_BEHIND;
3826
3827         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3828             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3829                 int cr; /* consider resync */
3830
3831                 /* if we established a new connection */
3832                 cr  = (os.conn < C_CONNECTED);
3833                 /* if we had an established connection
3834                  * and one of the nodes newly attaches a disk */
3835                 cr |= (os.conn == C_CONNECTED &&
3836                        (peer_state.disk == D_NEGOTIATING ||
3837                         os.disk == D_NEGOTIATING));
3838                 /* if we have both been inconsistent, and the peer has been
3839                  * forced to be UpToDate with --overwrite-data */
3840                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3841                 /* if we had been plain connected, and the admin requested to
3842                  * start a sync by "invalidate" or "invalidate-remote" */
3843                 cr |= (os.conn == C_CONNECTED &&
3844                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3845                                  peer_state.conn <= C_WF_BITMAP_T));
3846
3847                 if (cr)
3848                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3849
3850                 put_ldev(mdev);
3851                 if (ns.conn == C_MASK) {
3852                         ns.conn = C_CONNECTED;
3853                         if (mdev->state.disk == D_NEGOTIATING) {
3854                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3855                         } else if (peer_state.disk == D_NEGOTIATING) {
3856                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3857                                 peer_state.disk = D_DISKLESS;
3858                                 real_peer_disk = D_DISKLESS;
3859                         } else {
3860                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3861                                         return -EIO;
3862                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3863                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3864                                 return -EIO;
3865                         }
3866                 }
3867         }
3868
3869         spin_lock_irq(&mdev->tconn->req_lock);
3870         if (os.i != drbd_read_state(mdev).i)
3871                 goto retry;
3872         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3873         ns.peer = peer_state.role;
3874         ns.pdsk = real_peer_disk;
3875         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3876         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3877                 ns.disk = mdev->new_state_tmp.disk;
3878         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3879         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3880             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3881                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3882                    for temporal network outages! */
3883                 spin_unlock_irq(&mdev->tconn->req_lock);
3884                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3885                 tl_clear(mdev->tconn);
3886                 drbd_uuid_new_current(mdev);
3887                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3888                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3889                 return -EIO;
3890         }
3891         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3892         ns = drbd_read_state(mdev);
3893         spin_unlock_irq(&mdev->tconn->req_lock);
3894
3895         if (rv < SS_SUCCESS) {
3896                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3897                 return -EIO;
3898         }
3899
3900         if (os.conn > C_WF_REPORT_PARAMS) {
3901                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3902                     peer_state.disk != D_NEGOTIATING ) {
3903                         /* we want resync, peer has not yet decided to sync... */
3904                         /* Nowadays only used when forcing a node into primary role and
3905                            setting its disk to UpToDate with that */
3906                         drbd_send_uuids(mdev);
3907                         drbd_send_current_state(mdev);
3908                 }
3909         }
3910
3911         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3912
3913         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3914
3915         return 0;
3916 }
3917
3918 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3919 {
3920         struct drbd_conf *mdev;
3921         struct p_rs_uuid *p = pi->data;
3922
3923         mdev = vnr_to_mdev(tconn, pi->vnr);
3924         if (!mdev)
3925                 return -EIO;
3926
3927         wait_event(mdev->misc_wait,
3928                    mdev->state.conn == C_WF_SYNC_UUID ||
3929                    mdev->state.conn == C_BEHIND ||
3930                    mdev->state.conn < C_CONNECTED ||
3931                    mdev->state.disk < D_NEGOTIATING);
3932
3933         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3934
3935         /* Here the _drbd_uuid_ functions are right, current should
3936            _not_ be rotated into the history */
3937         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3938                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3939                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3940
3941                 drbd_print_uuids(mdev, "updated sync uuid");
3942                 drbd_start_resync(mdev, C_SYNC_TARGET);
3943
3944                 put_ldev(mdev);
3945         } else
3946                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3947
3948         return 0;
3949 }
3950
3951 /**
3952  * receive_bitmap_plain
3953  *
3954  * Return 0 when done, 1 when another iteration is needed, and a negative error
3955  * code upon failure.
3956  */
3957 static int
3958 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3959                      unsigned long *p, struct bm_xfer_ctx *c)
3960 {
3961         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3962                                  drbd_header_size(mdev->tconn);
3963         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3964                                        c->bm_words - c->word_offset);
3965         unsigned int want = num_words * sizeof(*p);
3966         int err;
3967
3968         if (want != size) {
3969                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3970                 return -EIO;
3971         }
3972         if (want == 0)
3973                 return 0;
3974         err = drbd_recv_all(mdev->tconn, p, want);
3975         if (err)
3976                 return err;
3977
3978         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3979
3980         c->word_offset += num_words;
3981         c->bit_offset = c->word_offset * BITS_PER_LONG;
3982         if (c->bit_offset > c->bm_bits)
3983                 c->bit_offset = c->bm_bits;
3984
3985         return 1;
3986 }
3987
3988 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3989 {
3990         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3991 }
3992
3993 static int dcbp_get_start(struct p_compressed_bm *p)
3994 {
3995         return (p->encoding & 0x80) != 0;
3996 }
3997
3998 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3999 {
4000         return (p->encoding >> 4) & 0x7;
4001 }
4002
4003 /**
4004  * recv_bm_rle_bits
4005  *
4006  * Return 0 when done, 1 when another iteration is needed, and a negative error
4007  * code upon failure.
4008  */
4009 static int
4010 recv_bm_rle_bits(struct drbd_conf *mdev,
4011                 struct p_compressed_bm *p,
4012                  struct bm_xfer_ctx *c,
4013                  unsigned int len)
4014 {
4015         struct bitstream bs;
4016         u64 look_ahead;
4017         u64 rl;
4018         u64 tmp;
4019         unsigned long s = c->bit_offset;
4020         unsigned long e;
4021         int toggle = dcbp_get_start(p);
4022         int have;
4023         int bits;
4024
4025         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4026
4027         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4028         if (bits < 0)
4029                 return -EIO;
4030
4031         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4032                 bits = vli_decode_bits(&rl, look_ahead);
4033                 if (bits <= 0)
4034                         return -EIO;
4035
4036                 if (toggle) {
4037                         e = s + rl -1;
4038                         if (e >= c->bm_bits) {
4039                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4040                                 return -EIO;
4041                         }
4042                         _drbd_bm_set_bits(mdev, s, e);
4043                 }
4044
4045                 if (have < bits) {
4046                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4047                                 have, bits, look_ahead,
4048                                 (unsigned int)(bs.cur.b - p->code),
4049                                 (unsigned int)bs.buf_len);
4050                         return -EIO;
4051                 }
4052                 look_ahead >>= bits;
4053                 have -= bits;
4054
4055                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4056                 if (bits < 0)
4057                         return -EIO;
4058                 look_ahead |= tmp << have;
4059                 have += bits;
4060         }
4061
4062         c->bit_offset = s;
4063         bm_xfer_ctx_bit_to_word_offset(c);
4064
4065         return (s != c->bm_bits);
4066 }
4067
4068 /**
4069  * decode_bitmap_c
4070  *
4071  * Return 0 when done, 1 when another iteration is needed, and a negative error
4072  * code upon failure.
4073  */
4074 static int
4075 decode_bitmap_c(struct drbd_conf *mdev,
4076                 struct p_compressed_bm *p,
4077                 struct bm_xfer_ctx *c,
4078                 unsigned int len)
4079 {
4080         if (dcbp_get_code(p) == RLE_VLI_Bits)
4081                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4082
4083         /* other variants had been implemented for evaluation,
4084          * but have been dropped as this one turned out to be "best"
4085          * during all our tests. */
4086
4087         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4088         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4089         return -EIO;
4090 }
4091
4092 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4093                 const char *direction, struct bm_xfer_ctx *c)
4094 {
4095         /* what would it take to transfer it "plaintext" */
4096         unsigned int header_size = drbd_header_size(mdev->tconn);
4097         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4098         unsigned int plain =
4099                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4100                 c->bm_words * sizeof(unsigned long);
4101         unsigned int total = c->bytes[0] + c->bytes[1];
4102         unsigned int r;
4103
4104         /* total can not be zero. but just in case: */
4105         if (total == 0)
4106                 return;
4107
4108         /* don't report if not compressed */
4109         if (total >= plain)
4110                 return;
4111
4112         /* total < plain. check for overflow, still */
4113         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4114                                     : (1000 * total / plain);
4115
4116         if (r > 1000)
4117                 r = 1000;
4118
4119         r = 1000 - r;
4120         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4121              "total %u; compression: %u.%u%%\n",
4122                         direction,
4123                         c->bytes[1], c->packets[1],
4124                         c->bytes[0], c->packets[0],
4125                         total, r/10, r % 10);
4126 }
4127
4128 /* Since we are processing the bitfield from lower addresses to higher,
4129    it does not matter if the process it in 32 bit chunks or 64 bit
4130    chunks as long as it is little endian. (Understand it as byte stream,
4131    beginning with the lowest byte...) If we would use big endian
4132    we would need to process it from the highest address to the lowest,
4133    in order to be agnostic to the 32 vs 64 bits issue.
4134
4135    returns 0 on failure, 1 if we successfully received it. */
4136 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4137 {
4138         struct drbd_conf *mdev;
4139         struct bm_xfer_ctx c;
4140         int err;
4141
4142         mdev = vnr_to_mdev(tconn, pi->vnr);
4143         if (!mdev)
4144                 return -EIO;
4145
4146         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4147         /* you are supposed to send additional out-of-sync information
4148          * if you actually set bits during this phase */
4149
4150         c = (struct bm_xfer_ctx) {
4151                 .bm_bits = drbd_bm_bits(mdev),
4152                 .bm_words = drbd_bm_words(mdev),
4153         };
4154
4155         for(;;) {
4156                 if (pi->cmd == P_BITMAP)
4157                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4158                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4159                         /* MAYBE: sanity check that we speak proto >= 90,
4160                          * and the feature is enabled! */
4161                         struct p_compressed_bm *p = pi->data;
4162
4163                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4164                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4165                                 err = -EIO;
4166                                 goto out;
4167                         }
4168                         if (pi->size <= sizeof(*p)) {
4169                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4170                                 err = -EIO;
4171                                 goto out;
4172                         }
4173                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4174                         if (err)
4175                                goto out;
4176                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4177                 } else {
4178                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4179                         err = -EIO;
4180                         goto out;
4181                 }
4182
4183                 c.packets[pi->cmd == P_BITMAP]++;
4184                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4185
4186                 if (err <= 0) {
4187                         if (err < 0)
4188                                 goto out;
4189                         break;
4190                 }
4191                 err = drbd_recv_header(mdev->tconn, pi);
4192                 if (err)
4193                         goto out;
4194         }
4195
4196         INFO_bm_xfer_stats(mdev, "receive", &c);
4197
4198         if (mdev->state.conn == C_WF_BITMAP_T) {
4199                 enum drbd_state_rv rv;
4200
4201                 err = drbd_send_bitmap(mdev);
4202                 if (err)
4203                         goto out;
4204                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4205                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4206                 D_ASSERT(rv == SS_SUCCESS);
4207         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4208                 /* admin may have requested C_DISCONNECTING,
4209                  * other threads may have noticed network errors */
4210                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4211                     drbd_conn_str(mdev->state.conn));
4212         }
4213         err = 0;
4214
4215  out:
4216         drbd_bm_unlock(mdev);
4217         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4218                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4219         return err;
4220 }
4221
4222 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4223 {
4224         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4225                  pi->cmd, pi->size);
4226
4227         return ignore_remaining_packet(tconn, pi);
4228 }
4229
4230 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4231 {
4232         /* Make sure we've acked all the TCP data associated
4233          * with the data requests being unplugged */
4234         drbd_tcp_quickack(tconn->data.socket);
4235
4236         return 0;
4237 }
4238
4239 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4240 {
4241         struct drbd_conf *mdev;
4242         struct p_block_desc *p = pi->data;
4243
4244         mdev = vnr_to_mdev(tconn, pi->vnr);
4245         if (!mdev)
4246                 return -EIO;
4247
4248         switch (mdev->state.conn) {
4249         case C_WF_SYNC_UUID:
4250         case C_WF_BITMAP_T:
4251         case C_BEHIND:
4252                         break;
4253         default:
4254                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4255                                 drbd_conn_str(mdev->state.conn));
4256         }
4257
4258         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4259
4260         return 0;
4261 }
4262
4263 struct data_cmd {
4264         int expect_payload;
4265         size_t pkt_size;
4266         int (*fn)(struct drbd_tconn *, struct packet_info *);
4267 };
4268
4269 static struct data_cmd drbd_cmd_handler[] = {
4270         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4271         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4272         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4273         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4274         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4275         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4276         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4277         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4278         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4279         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4280         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4281         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4282         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4283         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4284         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4285         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4286         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4287         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4288         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4289         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4290         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4291         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4292         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4293         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4294 };
4295
4296 static void drbdd(struct drbd_tconn *tconn)
4297 {
4298         struct packet_info pi;
4299         size_t shs; /* sub header size */
4300         int err;
4301
4302         while (get_t_state(&tconn->receiver) == RUNNING) {
4303                 struct data_cmd *cmd;
4304
4305                 drbd_thread_current_set_cpu(&tconn->receiver);
4306                 if (drbd_recv_header(tconn, &pi))
4307                         goto err_out;
4308
4309                 cmd = &drbd_cmd_handler[pi.cmd];
4310                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4311                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4312                                  cmdname(pi.cmd), pi.cmd);
4313                         goto err_out;
4314                 }
4315
4316                 shs = cmd->pkt_size;
4317                 if (pi.size > shs && !cmd->expect_payload) {
4318                         conn_err(tconn, "No payload expected %s l:%d\n",
4319                                  cmdname(pi.cmd), pi.size);
4320                         goto err_out;
4321                 }
4322
4323                 if (shs) {
4324                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4325                         if (err)
4326                                 goto err_out;
4327                         pi.size -= shs;
4328                 }
4329
4330                 err = cmd->fn(tconn, &pi);
4331                 if (err) {
4332                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4333                                  cmdname(pi.cmd), err, pi.size);
4334                         goto err_out;
4335                 }
4336         }
4337         return;
4338
4339     err_out:
4340         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4341 }
4342
4343 void conn_flush_workqueue(struct drbd_tconn *tconn)
4344 {
4345         struct drbd_wq_barrier barr;
4346
4347         barr.w.cb = w_prev_work_done;
4348         barr.w.tconn = tconn;
4349         init_completion(&barr.done);
4350         drbd_queue_work(&tconn->data.work, &barr.w);
4351         wait_for_completion(&barr.done);
4352 }
4353
4354 static void conn_disconnect(struct drbd_tconn *tconn)
4355 {
4356         struct drbd_conf *mdev;
4357         enum drbd_conns oc;
4358         int vnr;
4359
4360         if (tconn->cstate == C_STANDALONE)
4361                 return;
4362
4363         /* asender does not clean up anything. it must not interfere, either */
4364         drbd_thread_stop(&tconn->asender);
4365         drbd_free_sock(tconn);
4366
4367         rcu_read_lock();
4368         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4369                 kref_get(&mdev->kref);
4370                 rcu_read_unlock();
4371                 drbd_disconnected(mdev);
4372                 kref_put(&mdev->kref, &drbd_minor_destroy);
4373                 rcu_read_lock();
4374         }
4375         rcu_read_unlock();
4376
4377         if (!list_empty(&tconn->current_epoch->list))
4378                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4379         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4380         atomic_set(&tconn->current_epoch->epoch_size, 0);
4381
4382         conn_info(tconn, "Connection closed\n");
4383
4384         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4385                 conn_try_outdate_peer_async(tconn);
4386
4387         spin_lock_irq(&tconn->req_lock);
4388         oc = tconn->cstate;
4389         if (oc >= C_UNCONNECTED)
4390                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4391
4392         spin_unlock_irq(&tconn->req_lock);
4393
4394         if (oc == C_DISCONNECTING)
4395                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4396 }
4397
4398 static int drbd_disconnected(struct drbd_conf *mdev)
4399 {
4400         unsigned int i;
4401
4402         /* wait for current activity to cease. */
4403         spin_lock_irq(&mdev->tconn->req_lock);
4404         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4405         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4406         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4407         spin_unlock_irq(&mdev->tconn->req_lock);
4408
4409         /* We do not have data structures that would allow us to
4410          * get the rs_pending_cnt down to 0 again.
4411          *  * On C_SYNC_TARGET we do not have any data structures describing
4412          *    the pending RSDataRequest's we have sent.
4413          *  * On C_SYNC_SOURCE there is no data structure that tracks
4414          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4415          *  And no, it is not the sum of the reference counts in the
4416          *  resync_LRU. The resync_LRU tracks the whole operation including
4417          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4418          *  on the fly. */
4419         drbd_rs_cancel_all(mdev);
4420         mdev->rs_total = 0;
4421         mdev->rs_failed = 0;
4422         atomic_set(&mdev->rs_pending_cnt, 0);
4423         wake_up(&mdev->misc_wait);
4424
4425         del_timer_sync(&mdev->resync_timer);
4426         resync_timer_fn((unsigned long)mdev);
4427
4428         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4429          * w_make_resync_request etc. which may still be on the worker queue
4430          * to be "canceled" */
4431         drbd_flush_workqueue(mdev);
4432
4433         drbd_finish_peer_reqs(mdev);
4434
4435         kfree(mdev->p_uuid);
4436         mdev->p_uuid = NULL;
4437
4438         if (!drbd_suspended(mdev))
4439                 tl_clear(mdev->tconn);
4440
4441         drbd_md_sync(mdev);
4442
4443         /* serialize with bitmap writeout triggered by the state change,
4444          * if any. */
4445         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4446
4447         /* tcp_close and release of sendpage pages can be deferred.  I don't
4448          * want to use SO_LINGER, because apparently it can be deferred for
4449          * more than 20 seconds (longest time I checked).
4450          *
4451          * Actually we don't care for exactly when the network stack does its
4452          * put_page(), but release our reference on these pages right here.
4453          */
4454         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4455         if (i)
4456                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4457         i = atomic_read(&mdev->pp_in_use_by_net);
4458         if (i)
4459                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4460         i = atomic_read(&mdev->pp_in_use);
4461         if (i)
4462                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4463
4464         D_ASSERT(list_empty(&mdev->read_ee));
4465         D_ASSERT(list_empty(&mdev->active_ee));
4466         D_ASSERT(list_empty(&mdev->sync_ee));
4467         D_ASSERT(list_empty(&mdev->done_ee));
4468
4469         return 0;
4470 }
4471
4472 /*
4473  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4474  * we can agree on is stored in agreed_pro_version.
4475  *
4476  * feature flags and the reserved array should be enough room for future
4477  * enhancements of the handshake protocol, and possible plugins...
4478  *
4479  * for now, they are expected to be zero, but ignored.
4480  */
4481 static int drbd_send_features(struct drbd_tconn *tconn)
4482 {
4483         struct drbd_socket *sock;
4484         struct p_connection_features *p;
4485
4486         sock = &tconn->data;
4487         p = conn_prepare_command(tconn, sock);
4488         if (!p)
4489                 return -EIO;
4490         memset(p, 0, sizeof(*p));
4491         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4492         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4493         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4494 }
4495
4496 /*
4497  * return values:
4498  *   1 yes, we have a valid connection
4499  *   0 oops, did not work out, please try again
4500  *  -1 peer talks different language,
4501  *     no point in trying again, please go standalone.
4502  */
4503 static int drbd_do_features(struct drbd_tconn *tconn)
4504 {
4505         /* ASSERT current == tconn->receiver ... */
4506         struct p_connection_features *p;
4507         const int expect = sizeof(struct p_connection_features);
4508         struct packet_info pi;
4509         int err;
4510
4511         err = drbd_send_features(tconn);
4512         if (err)
4513                 return 0;
4514
4515         err = drbd_recv_header(tconn, &pi);
4516         if (err)
4517                 return 0;
4518
4519         if (pi.cmd != P_CONNECTION_FEATURES) {
4520                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4521                          cmdname(pi.cmd), pi.cmd);
4522                 return -1;
4523         }
4524
4525         if (pi.size != expect) {
4526                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4527                      expect, pi.size);
4528                 return -1;
4529         }
4530
4531         p = pi.data;
4532         err = drbd_recv_all_warn(tconn, p, expect);
4533         if (err)
4534                 return 0;
4535
4536         p->protocol_min = be32_to_cpu(p->protocol_min);
4537         p->protocol_max = be32_to_cpu(p->protocol_max);
4538         if (p->protocol_max == 0)
4539                 p->protocol_max = p->protocol_min;
4540
4541         if (PRO_VERSION_MAX < p->protocol_min ||
4542             PRO_VERSION_MIN > p->protocol_max)
4543                 goto incompat;
4544
4545         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4546
4547         conn_info(tconn, "Handshake successful: "
4548              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4549
4550         return 1;
4551
4552  incompat:
4553         conn_err(tconn, "incompatible DRBD dialects: "
4554             "I support %d-%d, peer supports %d-%d\n",
4555             PRO_VERSION_MIN, PRO_VERSION_MAX,
4556             p->protocol_min, p->protocol_max);
4557         return -1;
4558 }
4559
4560 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4561 static int drbd_do_auth(struct drbd_tconn *tconn)
4562 {
4563         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4564         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4565         return -1;
4566 }
4567 #else
4568 #define CHALLENGE_LEN 64
4569
4570 /* Return value:
4571         1 - auth succeeded,
4572         0 - failed, try again (network error),
4573         -1 - auth failed, don't try again.
4574 */
4575
4576 static int drbd_do_auth(struct drbd_tconn *tconn)
4577 {
4578         struct drbd_socket *sock;
4579         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4580         struct scatterlist sg;
4581         char *response = NULL;
4582         char *right_response = NULL;
4583         char *peers_ch = NULL;
4584         unsigned int key_len;
4585         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4586         unsigned int resp_size;
4587         struct hash_desc desc;
4588         struct packet_info pi;
4589         struct net_conf *nc;
4590         int err, rv;
4591
4592         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4593
4594         rcu_read_lock();
4595         nc = rcu_dereference(tconn->net_conf);
4596         key_len = strlen(nc->shared_secret);
4597         memcpy(secret, nc->shared_secret, key_len);
4598         rcu_read_unlock();
4599
4600         desc.tfm = tconn->cram_hmac_tfm;
4601         desc.flags = 0;
4602
4603         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4604         if (rv) {
4605                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4606                 rv = -1;
4607                 goto fail;
4608         }
4609
4610         get_random_bytes(my_challenge, CHALLENGE_LEN);
4611
4612         sock = &tconn->data;
4613         if (!conn_prepare_command(tconn, sock)) {
4614                 rv = 0;
4615                 goto fail;
4616         }
4617         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4618                                 my_challenge, CHALLENGE_LEN);
4619         if (!rv)
4620                 goto fail;
4621
4622         err = drbd_recv_header(tconn, &pi);
4623         if (err) {
4624                 rv = 0;
4625                 goto fail;
4626         }
4627
4628         if (pi.cmd != P_AUTH_CHALLENGE) {
4629                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4630                          cmdname(pi.cmd), pi.cmd);
4631                 rv = 0;
4632                 goto fail;
4633         }
4634
4635         if (pi.size > CHALLENGE_LEN * 2) {
4636                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4637                 rv = -1;
4638                 goto fail;
4639         }
4640
4641         peers_ch = kmalloc(pi.size, GFP_NOIO);
4642         if (peers_ch == NULL) {
4643                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4644                 rv = -1;
4645                 goto fail;
4646         }
4647
4648         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4649         if (err) {
4650                 rv = 0;
4651                 goto fail;
4652         }
4653
4654         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4655         response = kmalloc(resp_size, GFP_NOIO);
4656         if (response == NULL) {
4657                 conn_err(tconn, "kmalloc of response failed\n");
4658                 rv = -1;
4659                 goto fail;
4660         }
4661
4662         sg_init_table(&sg, 1);
4663         sg_set_buf(&sg, peers_ch, pi.size);
4664
4665         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4666         if (rv) {
4667                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4668                 rv = -1;
4669                 goto fail;
4670         }
4671
4672         if (!conn_prepare_command(tconn, sock)) {
4673                 rv = 0;
4674                 goto fail;
4675         }
4676         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4677                                 response, resp_size);
4678         if (!rv)
4679                 goto fail;
4680
4681         err = drbd_recv_header(tconn, &pi);
4682         if (err) {
4683                 rv = 0;
4684                 goto fail;
4685         }
4686
4687         if (pi.cmd != P_AUTH_RESPONSE) {
4688                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4689                          cmdname(pi.cmd), pi.cmd);
4690                 rv = 0;
4691                 goto fail;
4692         }
4693
4694         if (pi.size != resp_size) {
4695                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4696                 rv = 0;
4697                 goto fail;
4698         }
4699
4700         err = drbd_recv_all_warn(tconn, response , resp_size);
4701         if (err) {
4702                 rv = 0;
4703                 goto fail;
4704         }
4705
4706         right_response = kmalloc(resp_size, GFP_NOIO);
4707         if (right_response == NULL) {
4708                 conn_err(tconn, "kmalloc of right_response failed\n");
4709                 rv = -1;
4710                 goto fail;
4711         }
4712
4713         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4714
4715         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4716         if (rv) {
4717                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4718                 rv = -1;
4719                 goto fail;
4720         }
4721
4722         rv = !memcmp(response, right_response, resp_size);
4723
4724         if (rv)
4725                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4726                      resp_size);
4727         else
4728                 rv = -1;
4729
4730  fail:
4731         kfree(peers_ch);
4732         kfree(response);
4733         kfree(right_response);
4734
4735         return rv;
4736 }
4737 #endif
4738
4739 int drbdd_init(struct drbd_thread *thi)
4740 {
4741         struct drbd_tconn *tconn = thi->tconn;
4742         int h;
4743
4744         conn_info(tconn, "receiver (re)started\n");
4745
4746         do {
4747                 h = conn_connect(tconn);
4748                 if (h == 0) {
4749                         conn_disconnect(tconn);
4750                         schedule_timeout_interruptible(HZ);
4751                 }
4752                 if (h == -1) {
4753                         conn_warn(tconn, "Discarding network configuration.\n");
4754                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4755                 }
4756         } while (h == 0);
4757
4758         if (h > 0)
4759                 drbdd(tconn);
4760
4761         conn_disconnect(tconn);
4762
4763         conn_info(tconn, "receiver terminated\n");
4764         return 0;
4765 }
4766
4767 /* ********* acknowledge sender ******** */
4768
4769 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4770 {
4771         struct p_req_state_reply *p = pi->data;
4772         int retcode = be32_to_cpu(p->retcode);
4773
4774         if (retcode >= SS_SUCCESS) {
4775                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4776         } else {
4777                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4778                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4779                          drbd_set_st_err_str(retcode), retcode);
4780         }
4781         wake_up(&tconn->ping_wait);
4782
4783         return 0;
4784 }
4785
4786 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4787 {
4788         struct drbd_conf *mdev;
4789         struct p_req_state_reply *p = pi->data;
4790         int retcode = be32_to_cpu(p->retcode);
4791
4792         mdev = vnr_to_mdev(tconn, pi->vnr);
4793         if (!mdev)
4794                 return -EIO;
4795
4796         if (retcode >= SS_SUCCESS) {
4797                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4798         } else {
4799                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4800                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4801                         drbd_set_st_err_str(retcode), retcode);
4802         }
4803         wake_up(&mdev->state_wait);
4804
4805         return 0;
4806 }
4807
4808 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4809 {
4810         return drbd_send_ping_ack(tconn);
4811
4812 }
4813
4814 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4815 {
4816         /* restore idle timeout */
4817         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4818         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4819                 wake_up(&tconn->ping_wait);
4820
4821         return 0;
4822 }
4823
4824 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4825 {
4826         struct drbd_conf *mdev;
4827         struct p_block_ack *p = pi->data;
4828         sector_t sector = be64_to_cpu(p->sector);
4829         int blksize = be32_to_cpu(p->blksize);
4830
4831         mdev = vnr_to_mdev(tconn, pi->vnr);
4832         if (!mdev)
4833                 return -EIO;
4834
4835         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4836
4837         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4838
4839         if (get_ldev(mdev)) {
4840                 drbd_rs_complete_io(mdev, sector);
4841                 drbd_set_in_sync(mdev, sector, blksize);
4842                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4843                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4844                 put_ldev(mdev);
4845         }
4846         dec_rs_pending(mdev);
4847         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4848
4849         return 0;
4850 }
4851
4852 static int
4853 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4854                               struct rb_root *root, const char *func,
4855                               enum drbd_req_event what, bool missing_ok)
4856 {
4857         struct drbd_request *req;
4858         struct bio_and_error m;
4859
4860         spin_lock_irq(&mdev->tconn->req_lock);
4861         req = find_request(mdev, root, id, sector, missing_ok, func);
4862         if (unlikely(!req)) {
4863                 spin_unlock_irq(&mdev->tconn->req_lock);
4864                 return -EIO;
4865         }
4866         __req_mod(req, what, &m);
4867         spin_unlock_irq(&mdev->tconn->req_lock);
4868
4869         if (m.bio)
4870                 complete_master_bio(mdev, &m);
4871         return 0;
4872 }
4873
4874 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4875 {
4876         struct drbd_conf *mdev;
4877         struct p_block_ack *p = pi->data;
4878         sector_t sector = be64_to_cpu(p->sector);
4879         int blksize = be32_to_cpu(p->blksize);
4880         enum drbd_req_event what;
4881
4882         mdev = vnr_to_mdev(tconn, pi->vnr);
4883         if (!mdev)
4884                 return -EIO;
4885
4886         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4887
4888         if (p->block_id == ID_SYNCER) {
4889                 drbd_set_in_sync(mdev, sector, blksize);
4890                 dec_rs_pending(mdev);
4891                 return 0;
4892         }
4893         switch (pi->cmd) {
4894         case P_RS_WRITE_ACK:
4895                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4896                 break;
4897         case P_WRITE_ACK:
4898                 what = WRITE_ACKED_BY_PEER;
4899                 break;
4900         case P_RECV_ACK:
4901                 what = RECV_ACKED_BY_PEER;
4902                 break;
4903         case P_DISCARD_WRITE:
4904                 what = DISCARD_WRITE;
4905                 break;
4906         case P_RETRY_WRITE:
4907                 what = POSTPONE_WRITE;
4908                 break;
4909         default:
4910                 BUG();
4911         }
4912
4913         return validate_req_change_req_state(mdev, p->block_id, sector,
4914                                              &mdev->write_requests, __func__,
4915                                              what, false);
4916 }
4917
4918 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4919 {
4920         struct drbd_conf *mdev;
4921         struct p_block_ack *p = pi->data;
4922         sector_t sector = be64_to_cpu(p->sector);
4923         int size = be32_to_cpu(p->blksize);
4924         int err;
4925
4926         mdev = vnr_to_mdev(tconn, pi->vnr);
4927         if (!mdev)
4928                 return -EIO;
4929
4930         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4931
4932         if (p->block_id == ID_SYNCER) {
4933                 dec_rs_pending(mdev);
4934                 drbd_rs_failed_io(mdev, sector, size);
4935                 return 0;
4936         }
4937
4938         err = validate_req_change_req_state(mdev, p->block_id, sector,
4939                                             &mdev->write_requests, __func__,
4940                                             NEG_ACKED, true);
4941         if (err) {
4942                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4943                    The master bio might already be completed, therefore the
4944                    request is no longer in the collision hash. */
4945                 /* In Protocol B we might already have got a P_RECV_ACK
4946                    but then get a P_NEG_ACK afterwards. */
4947                 drbd_set_out_of_sync(mdev, sector, size);
4948         }
4949         return 0;
4950 }
4951
4952 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4953 {
4954         struct drbd_conf *mdev;
4955         struct p_block_ack *p = pi->data;
4956         sector_t sector = be64_to_cpu(p->sector);
4957
4958         mdev = vnr_to_mdev(tconn, pi->vnr);
4959         if (!mdev)
4960                 return -EIO;
4961
4962         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4963
4964         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4965             (unsigned long long)sector, be32_to_cpu(p->blksize));
4966
4967         return validate_req_change_req_state(mdev, p->block_id, sector,
4968                                              &mdev->read_requests, __func__,
4969                                              NEG_ACKED, false);
4970 }
4971
4972 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4973 {
4974         struct drbd_conf *mdev;
4975         sector_t sector;
4976         int size;
4977         struct p_block_ack *p = pi->data;
4978
4979         mdev = vnr_to_mdev(tconn, pi->vnr);
4980         if (!mdev)
4981                 return -EIO;
4982
4983         sector = be64_to_cpu(p->sector);
4984         size = be32_to_cpu(p->blksize);
4985
4986         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4987
4988         dec_rs_pending(mdev);
4989
4990         if (get_ldev_if_state(mdev, D_FAILED)) {
4991                 drbd_rs_complete_io(mdev, sector);
4992                 switch (pi->cmd) {
4993                 case P_NEG_RS_DREPLY:
4994                         drbd_rs_failed_io(mdev, sector, size);
4995                 case P_RS_CANCEL:
4996                         break;
4997                 default:
4998                         BUG();
4999                 }
5000                 put_ldev(mdev);
5001         }
5002
5003         return 0;
5004 }
5005
5006 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5007 {
5008         struct drbd_conf *mdev;
5009         struct p_barrier_ack *p = pi->data;
5010
5011         mdev = vnr_to_mdev(tconn, pi->vnr);
5012         if (!mdev)
5013                 return -EIO;
5014
5015         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
5016
5017         if (mdev->state.conn == C_AHEAD &&
5018             atomic_read(&mdev->ap_in_flight) == 0 &&
5019             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5020                 mdev->start_resync_timer.expires = jiffies + HZ;
5021                 add_timer(&mdev->start_resync_timer);
5022         }
5023
5024         return 0;
5025 }
5026
5027 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5028 {
5029         struct drbd_conf *mdev;
5030         struct p_block_ack *p = pi->data;
5031         struct drbd_work *w;
5032         sector_t sector;
5033         int size;
5034
5035         mdev = vnr_to_mdev(tconn, pi->vnr);
5036         if (!mdev)
5037                 return -EIO;
5038
5039         sector = be64_to_cpu(p->sector);
5040         size = be32_to_cpu(p->blksize);
5041
5042         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5043
5044         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5045                 drbd_ov_out_of_sync_found(mdev, sector, size);
5046         else
5047                 ov_out_of_sync_print(mdev);
5048
5049         if (!get_ldev(mdev))
5050                 return 0;
5051
5052         drbd_rs_complete_io(mdev, sector);
5053         dec_rs_pending(mdev);
5054
5055         --mdev->ov_left;
5056
5057         /* let's advance progress step marks only for every other megabyte */
5058         if ((mdev->ov_left & 0x200) == 0x200)
5059                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5060
5061         if (mdev->ov_left == 0) {
5062                 w = kmalloc(sizeof(*w), GFP_NOIO);
5063                 if (w) {
5064                         w->cb = w_ov_finished;
5065                         w->mdev = mdev;
5066                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5067                 } else {
5068                         dev_err(DEV, "kmalloc(w) failed.");
5069                         ov_out_of_sync_print(mdev);
5070                         drbd_resync_finished(mdev);
5071                 }
5072         }
5073         put_ldev(mdev);
5074         return 0;
5075 }
5076
5077 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5078 {
5079         return 0;
5080 }
5081
5082 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5083 {
5084         struct drbd_conf *mdev;
5085         int vnr, not_empty = 0;
5086
5087         do {
5088                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5089                 flush_signals(current);
5090
5091                 rcu_read_lock();
5092                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5093                         kref_get(&mdev->kref);
5094                         rcu_read_unlock();
5095                         if (drbd_finish_peer_reqs(mdev)) {
5096                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5097                                 return 1;
5098                         }
5099                         kref_put(&mdev->kref, &drbd_minor_destroy);
5100                         rcu_read_lock();
5101                 }
5102                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5103
5104                 spin_lock_irq(&tconn->req_lock);
5105                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5106                         not_empty = !list_empty(&mdev->done_ee);
5107                         if (not_empty)
5108                                 break;
5109                 }
5110                 spin_unlock_irq(&tconn->req_lock);
5111                 rcu_read_unlock();
5112         } while (not_empty);
5113
5114         return 0;
5115 }
5116
5117 struct asender_cmd {
5118         size_t pkt_size;
5119         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5120 };
5121
5122 static struct asender_cmd asender_tbl[] = {
5123         [P_PING]            = { 0, got_Ping },
5124         [P_PING_ACK]        = { 0, got_PingAck },
5125         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5126         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5127         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5128         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5129         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5130         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5131         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5132         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5133         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5134         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5135         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5136         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5137         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5138         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5139         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5140 };
5141
5142 int drbd_asender(struct drbd_thread *thi)
5143 {
5144         struct drbd_tconn *tconn = thi->tconn;
5145         struct asender_cmd *cmd = NULL;
5146         struct packet_info pi;
5147         int rv;
5148         void *buf    = tconn->meta.rbuf;
5149         int received = 0;
5150         unsigned int header_size = drbd_header_size(tconn);
5151         int expect   = header_size;
5152         bool ping_timeout_active = false;
5153         struct net_conf *nc;
5154         int ping_timeo, tcp_cork, ping_int;
5155
5156         current->policy = SCHED_RR;  /* Make this a realtime task! */
5157         current->rt_priority = 2;    /* more important than all other tasks */
5158
5159         while (get_t_state(thi) == RUNNING) {
5160                 drbd_thread_current_set_cpu(thi);
5161
5162                 rcu_read_lock();
5163                 nc = rcu_dereference(tconn->net_conf);
5164                 ping_timeo = nc->ping_timeo;
5165                 tcp_cork = nc->tcp_cork;
5166                 ping_int = nc->ping_int;
5167                 rcu_read_unlock();
5168
5169                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5170                         if (drbd_send_ping(tconn)) {
5171                                 conn_err(tconn, "drbd_send_ping has failed\n");
5172                                 goto reconnect;
5173                         }
5174                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5175                         ping_timeout_active = true;
5176                 }
5177
5178                 /* TODO: conditionally cork; it may hurt latency if we cork without
5179                    much to send */
5180                 if (tcp_cork)
5181                         drbd_tcp_cork(tconn->meta.socket);
5182                 if (tconn_finish_peer_reqs(tconn)) {
5183                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5184                         goto reconnect;
5185                 }
5186                 /* but unconditionally uncork unless disabled */
5187                 if (tcp_cork)
5188                         drbd_tcp_uncork(tconn->meta.socket);
5189
5190                 /* short circuit, recv_msg would return EINTR anyways. */
5191                 if (signal_pending(current))
5192                         continue;
5193
5194                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5195                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5196
5197                 flush_signals(current);
5198
5199                 /* Note:
5200                  * -EINTR        (on meta) we got a signal
5201                  * -EAGAIN       (on meta) rcvtimeo expired
5202                  * -ECONNRESET   other side closed the connection
5203                  * -ERESTARTSYS  (on data) we got a signal
5204                  * rv <  0       other than above: unexpected error!
5205                  * rv == expected: full header or command
5206                  * rv <  expected: "woken" by signal during receive
5207                  * rv == 0       : "connection shut down by peer"
5208                  */
5209                 if (likely(rv > 0)) {
5210                         received += rv;
5211                         buf      += rv;
5212                 } else if (rv == 0) {
5213                         conn_err(tconn, "meta connection shut down by peer.\n");
5214                         goto reconnect;
5215                 } else if (rv == -EAGAIN) {
5216                         /* If the data socket received something meanwhile,
5217                          * that is good enough: peer is still alive. */
5218                         if (time_after(tconn->last_received,
5219                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5220                                 continue;
5221                         if (ping_timeout_active) {
5222                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5223                                 goto reconnect;
5224                         }
5225                         set_bit(SEND_PING, &tconn->flags);
5226                         continue;
5227                 } else if (rv == -EINTR) {
5228                         continue;
5229                 } else {
5230                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5231                         goto reconnect;
5232                 }
5233
5234                 if (received == expect && cmd == NULL) {
5235                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5236                                 goto reconnect;
5237                         cmd = &asender_tbl[pi.cmd];
5238                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5239                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5240                                          cmdname(pi.cmd), pi.cmd);
5241                                 goto disconnect;
5242                         }
5243                         expect = header_size + cmd->pkt_size;
5244                         if (pi.size != expect - header_size) {
5245                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5246                                         pi.cmd, pi.size);
5247                                 goto reconnect;
5248                         }
5249                 }
5250                 if (received == expect) {
5251                         bool err;
5252
5253                         err = cmd->fn(tconn, &pi);
5254                         if (err) {
5255                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5256                                 goto reconnect;
5257                         }
5258
5259                         tconn->last_received = jiffies;
5260
5261                         if (cmd == &asender_tbl[P_PING_ACK]) {
5262                                 /* restore idle timeout */
5263                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5264                                 ping_timeout_active = false;
5265                         }
5266
5267                         buf      = tconn->meta.rbuf;
5268                         received = 0;
5269                         expect   = header_size;
5270                         cmd      = NULL;
5271                 }
5272         }
5273
5274         if (0) {
5275 reconnect:
5276                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5277         }
5278         if (0) {
5279 disconnect:
5280                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5281         }
5282         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5283
5284         conn_info(tconn, "asender terminated\n");
5285
5286         return 0;
5287 }