drbd: Fix a potential write ordering issue on SyncTarget nodes
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490         __module_get((*newsock)->ops->owner);
491
492 out:
493         return err;
494 }
495
496 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
497 {
498         mm_segment_t oldfs;
499         struct kvec iov = {
500                 .iov_base = buf,
501                 .iov_len = size,
502         };
503         struct msghdr msg = {
504                 .msg_iovlen = 1,
505                 .msg_iov = (struct iovec *)&iov,
506                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
507         };
508         int rv;
509
510         oldfs = get_fs();
511         set_fs(KERNEL_DS);
512         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
513         set_fs(oldfs);
514
515         return rv;
516 }
517
518 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
519 {
520         mm_segment_t oldfs;
521         struct kvec iov = {
522                 .iov_base = buf,
523                 .iov_len = size,
524         };
525         struct msghdr msg = {
526                 .msg_iovlen = 1,
527                 .msg_iov = (struct iovec *)&iov,
528                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
529         };
530         int rv;
531
532         oldfs = get_fs();
533         set_fs(KERNEL_DS);
534
535         for (;;) {
536                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
537                 if (rv == size)
538                         break;
539
540                 /* Note:
541                  * ECONNRESET   other side closed the connection
542                  * ERESTARTSYS  (on  sock) we got a signal
543                  */
544
545                 if (rv < 0) {
546                         if (rv == -ECONNRESET)
547                                 conn_info(tconn, "sock was reset by peer\n");
548                         else if (rv != -ERESTARTSYS)
549                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
550                         break;
551                 } else if (rv == 0) {
552                         conn_info(tconn, "sock was shut down by peer\n");
553                         break;
554                 } else  {
555                         /* signal came in, or peer/link went down,
556                          * after we read a partial message
557                          */
558                         /* D_ASSERT(signal_pending(current)); */
559                         break;
560                 }
561         };
562
563         set_fs(oldfs);
564
565         if (rv != size)
566                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
567
568         return rv;
569 }
570
571 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
572 {
573         int err;
574
575         err = drbd_recv(tconn, buf, size);
576         if (err != size) {
577                 if (err >= 0)
578                         err = -EIO;
579         } else
580                 err = 0;
581         return err;
582 }
583
584 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
585 {
586         int err;
587
588         err = drbd_recv_all(tconn, buf, size);
589         if (err && !signal_pending(current))
590                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
591         return err;
592 }
593
594 /* quoting tcp(7):
595  *   On individual connections, the socket buffer size must be set prior to the
596  *   listen(2) or connect(2) calls in order to have it take effect.
597  * This is our wrapper to do so.
598  */
599 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
600                 unsigned int rcv)
601 {
602         /* open coded SO_SNDBUF, SO_RCVBUF */
603         if (snd) {
604                 sock->sk->sk_sndbuf = snd;
605                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
606         }
607         if (rcv) {
608                 sock->sk->sk_rcvbuf = rcv;
609                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
610         }
611 }
612
613 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
614 {
615         const char *what;
616         struct socket *sock;
617         struct sockaddr_in6 src_in6;
618         struct sockaddr_in6 peer_in6;
619         struct net_conf *nc;
620         int err, peer_addr_len, my_addr_len;
621         int sndbuf_size, rcvbuf_size, connect_int;
622         int disconnect_on_error = 1;
623
624         rcu_read_lock();
625         nc = rcu_dereference(tconn->net_conf);
626         if (!nc) {
627                 rcu_read_unlock();
628                 return NULL;
629         }
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         connect_int = nc->connect_int;
633         rcu_read_unlock();
634
635         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
636         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
637
638         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
639                 src_in6.sin6_port = 0;
640         else
641                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642
643         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
644         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
645
646         what = "sock_create_kern";
647         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
648                                SOCK_STREAM, IPPROTO_TCP, &sock);
649         if (err < 0) {
650                 sock = NULL;
651                 goto out;
652         }
653
654         sock->sk->sk_rcvtimeo =
655         sock->sk->sk_sndtimeo = connect_int * HZ;
656         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
657
658        /* explicitly bind to the configured IP as source IP
659         *  for the outgoing connections.
660         *  This is needed for multihomed hosts and to be
661         *  able to use lo: interfaces for drbd.
662         * Make sure to use 0 as port number, so linux selects
663         *  a free one dynamically.
664         */
665         what = "bind before connect";
666         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
667         if (err < 0)
668                 goto out;
669
670         /* connect may fail, peer not yet available.
671          * stay C_WF_CONNECTION, don't go Disconnecting! */
672         disconnect_on_error = 0;
673         what = "connect";
674         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
675
676 out:
677         if (err < 0) {
678                 if (sock) {
679                         sock_release(sock);
680                         sock = NULL;
681                 }
682                 switch (-err) {
683                         /* timeout, busy, signal pending */
684                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
685                 case EINTR: case ERESTARTSYS:
686                         /* peer not (yet) available, network problem */
687                 case ECONNREFUSED: case ENETUNREACH:
688                 case EHOSTDOWN:    case EHOSTUNREACH:
689                         disconnect_on_error = 0;
690                         break;
691                 default:
692                         conn_err(tconn, "%s failed, err = %d\n", what, err);
693                 }
694                 if (disconnect_on_error)
695                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
696         }
697
698         return sock;
699 }
700
701 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
702 {
703         int timeo, err, my_addr_len;
704         int sndbuf_size, rcvbuf_size, connect_int;
705         struct socket *s_estab = NULL, *s_listen;
706         struct sockaddr_in6 my_addr;
707         struct net_conf *nc;
708         const char *what;
709
710         rcu_read_lock();
711         nc = rcu_dereference(tconn->net_conf);
712         if (!nc) {
713                 rcu_read_unlock();
714                 return NULL;
715         }
716         sndbuf_size = nc->sndbuf_size;
717         rcvbuf_size = nc->rcvbuf_size;
718         connect_int = nc->connect_int;
719         rcu_read_unlock();
720
721         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
722         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
723
724         what = "sock_create_kern";
725         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
726                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
727         if (err) {
728                 s_listen = NULL;
729                 goto out;
730         }
731
732         timeo = connect_int * HZ;
733         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
734
735         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
736         s_listen->sk->sk_rcvtimeo = timeo;
737         s_listen->sk->sk_sndtimeo = timeo;
738         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
739
740         what = "bind before listen";
741         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
742         if (err < 0)
743                 goto out;
744
745         err = drbd_accept(&what, s_listen, &s_estab);
746
747 out:
748         if (s_listen)
749                 sock_release(s_listen);
750         if (err < 0) {
751                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
752                         conn_err(tconn, "%s failed, err = %d\n", what, err);
753                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
754                 }
755         }
756
757         return s_estab;
758 }
759
760 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
761
762 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
763                              enum drbd_packet cmd)
764 {
765         if (!conn_prepare_command(tconn, sock))
766                 return -EIO;
767         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
768 }
769
770 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
771 {
772         unsigned int header_size = drbd_header_size(tconn);
773         struct packet_info pi;
774         int err;
775
776         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
777         if (err != header_size) {
778                 if (err >= 0)
779                         err = -EIO;
780                 return err;
781         }
782         err = decode_header(tconn, tconn->data.rbuf, &pi);
783         if (err)
784                 return err;
785         return pi.cmd;
786 }
787
788 /**
789  * drbd_socket_okay() - Free the socket if its connection is not okay
790  * @sock:       pointer to the pointer to the socket.
791  */
792 static int drbd_socket_okay(struct socket **sock)
793 {
794         int rr;
795         char tb[4];
796
797         if (!*sock)
798                 return false;
799
800         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
801
802         if (rr > 0 || rr == -EAGAIN) {
803                 return true;
804         } else {
805                 sock_release(*sock);
806                 *sock = NULL;
807                 return false;
808         }
809 }
810 /* Gets called if a connection is established, or if a new minor gets created
811    in a connection */
812 int drbd_connected(struct drbd_conf *mdev)
813 {
814         int err;
815
816         atomic_set(&mdev->packet_seq, 0);
817         mdev->peer_seq = 0;
818
819         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
820                 &mdev->tconn->cstate_mutex :
821                 &mdev->own_state_mutex;
822
823         err = drbd_send_sync_param(mdev);
824         if (!err)
825                 err = drbd_send_sizes(mdev, 0, 0);
826         if (!err)
827                 err = drbd_send_uuids(mdev);
828         if (!err)
829                 err = drbd_send_current_state(mdev);
830         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
831         clear_bit(RESIZE_PENDING, &mdev->flags);
832         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
833         return err;
834 }
835
836 /*
837  * return values:
838  *   1 yes, we have a valid connection
839  *   0 oops, did not work out, please try again
840  *  -1 peer talks different language,
841  *     no point in trying again, please go standalone.
842  *  -2 We do not have a network config...
843  */
844 static int conn_connect(struct drbd_tconn *tconn)
845 {
846         struct drbd_socket sock, msock;
847         struct drbd_conf *mdev;
848         struct net_conf *nc;
849         int vnr, timeout, try, h, ok;
850         bool discard_my_data;
851
852         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
853                 return -2;
854
855         mutex_init(&sock.mutex);
856         sock.sbuf = tconn->data.sbuf;
857         sock.rbuf = tconn->data.rbuf;
858         sock.socket = NULL;
859         mutex_init(&msock.mutex);
860         msock.sbuf = tconn->meta.sbuf;
861         msock.rbuf = tconn->meta.rbuf;
862         msock.socket = NULL;
863
864         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
865
866         /* Assume that the peer only understands protocol 80 until we know better.  */
867         tconn->agreed_pro_version = 80;
868
869         do {
870                 struct socket *s;
871
872                 for (try = 0;;) {
873                         /* 3 tries, this should take less than a second! */
874                         s = drbd_try_connect(tconn);
875                         if (s || ++try >= 3)
876                                 break;
877                         /* give the other side time to call bind() & listen() */
878                         schedule_timeout_interruptible(HZ / 10);
879                 }
880
881                 if (s) {
882                         if (!sock.socket) {
883                                 sock.socket = s;
884                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
885                         } else if (!msock.socket) {
886                                 msock.socket = s;
887                                 send_first_packet(tconn, &msock, P_INITIAL_META);
888                         } else {
889                                 conn_err(tconn, "Logic error in conn_connect()\n");
890                                 goto out_release_sockets;
891                         }
892                 }
893
894                 if (sock.socket && msock.socket) {
895                         rcu_read_lock();
896                         nc = rcu_dereference(tconn->net_conf);
897                         timeout = nc->ping_timeo * HZ / 10;
898                         rcu_read_unlock();
899                         schedule_timeout_interruptible(timeout);
900                         ok = drbd_socket_okay(&sock.socket);
901                         ok = drbd_socket_okay(&msock.socket) && ok;
902                         if (ok)
903                                 break;
904                 }
905
906 retry:
907                 s = drbd_wait_for_connect(tconn);
908                 if (s) {
909                         try = receive_first_packet(tconn, s);
910                         drbd_socket_okay(&sock.socket);
911                         drbd_socket_okay(&msock.socket);
912                         switch (try) {
913                         case P_INITIAL_DATA:
914                                 if (sock.socket) {
915                                         conn_warn(tconn, "initial packet S crossed\n");
916                                         sock_release(sock.socket);
917                                 }
918                                 sock.socket = s;
919                                 break;
920                         case P_INITIAL_META:
921                                 if (msock.socket) {
922                                         conn_warn(tconn, "initial packet M crossed\n");
923                                         sock_release(msock.socket);
924                                 }
925                                 msock.socket = s;
926                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
927                                 break;
928                         default:
929                                 conn_warn(tconn, "Error receiving initial packet\n");
930                                 sock_release(s);
931                                 if (random32() & 1)
932                                         goto retry;
933                         }
934                 }
935
936                 if (tconn->cstate <= C_DISCONNECTING)
937                         goto out_release_sockets;
938                 if (signal_pending(current)) {
939                         flush_signals(current);
940                         smp_rmb();
941                         if (get_t_state(&tconn->receiver) == EXITING)
942                                 goto out_release_sockets;
943                 }
944
945                 if (sock.socket && &msock.socket) {
946                         ok = drbd_socket_okay(&sock.socket);
947                         ok = drbd_socket_okay(&msock.socket) && ok;
948                         if (ok)
949                                 break;
950                 }
951         } while (1);
952
953         sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
954         msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
955
956         sock.socket->sk->sk_allocation = GFP_NOIO;
957         msock.socket->sk->sk_allocation = GFP_NOIO;
958
959         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
960         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
961
962         /* NOT YET ...
963          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
964          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
965          * first set it to the P_CONNECTION_FEATURES timeout,
966          * which we set to 4x the configured ping_timeout. */
967         rcu_read_lock();
968         nc = rcu_dereference(tconn->net_conf);
969
970         sock.socket->sk->sk_sndtimeo =
971         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
972
973         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
974         timeout = nc->timeout * HZ / 10;
975         discard_my_data = nc->discard_my_data;
976         rcu_read_unlock();
977
978         msock.socket->sk->sk_sndtimeo = timeout;
979
980         /* we don't want delays.
981          * we use TCP_CORK where appropriate, though */
982         drbd_tcp_nodelay(sock.socket);
983         drbd_tcp_nodelay(msock.socket);
984
985         tconn->data.socket = sock.socket;
986         tconn->meta.socket = msock.socket;
987         tconn->last_received = jiffies;
988
989         h = drbd_do_features(tconn);
990         if (h <= 0)
991                 return h;
992
993         if (tconn->cram_hmac_tfm) {
994                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
995                 switch (drbd_do_auth(tconn)) {
996                 case -1:
997                         conn_err(tconn, "Authentication of peer failed\n");
998                         return -1;
999                 case 0:
1000                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1001                         return 0;
1002                 }
1003         }
1004
1005         tconn->data.socket->sk->sk_sndtimeo = timeout;
1006         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1007
1008         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1009                 return -1;
1010
1011         rcu_read_lock();
1012         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1013                 kref_get(&mdev->kref);
1014                 rcu_read_unlock();
1015
1016                 if (discard_my_data)
1017                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1018                 else
1019                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1020
1021                 drbd_connected(mdev);
1022                 kref_put(&mdev->kref, &drbd_minor_destroy);
1023                 rcu_read_lock();
1024         }
1025         rcu_read_unlock();
1026
1027         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
1028                 return 0;
1029
1030         drbd_thread_start(&tconn->asender);
1031
1032         mutex_lock(&tconn->conf_update);
1033         /* The discard_my_data flag is a single-shot modifier to the next
1034          * connection attempt, the handshake of which is now well underway.
1035          * No need for rcu style copying of the whole struct
1036          * just to clear a single value. */
1037         tconn->net_conf->discard_my_data = 0;
1038         mutex_unlock(&tconn->conf_update);
1039
1040         return h;
1041
1042 out_release_sockets:
1043         if (sock.socket)
1044                 sock_release(sock.socket);
1045         if (msock.socket)
1046                 sock_release(msock.socket);
1047         return -1;
1048 }
1049
1050 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1051 {
1052         unsigned int header_size = drbd_header_size(tconn);
1053
1054         if (header_size == sizeof(struct p_header100) &&
1055             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1056                 struct p_header100 *h = header;
1057                 if (h->pad != 0) {
1058                         conn_err(tconn, "Header padding is not zero\n");
1059                         return -EINVAL;
1060                 }
1061                 pi->vnr = be16_to_cpu(h->volume);
1062                 pi->cmd = be16_to_cpu(h->command);
1063                 pi->size = be32_to_cpu(h->length);
1064         } else if (header_size == sizeof(struct p_header95) &&
1065                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1066                 struct p_header95 *h = header;
1067                 pi->cmd = be16_to_cpu(h->command);
1068                 pi->size = be32_to_cpu(h->length);
1069                 pi->vnr = 0;
1070         } else if (header_size == sizeof(struct p_header80) &&
1071                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1072                 struct p_header80 *h = header;
1073                 pi->cmd = be16_to_cpu(h->command);
1074                 pi->size = be16_to_cpu(h->length);
1075                 pi->vnr = 0;
1076         } else {
1077                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1078                          be32_to_cpu(*(__be32 *)header),
1079                          tconn->agreed_pro_version);
1080                 return -EINVAL;
1081         }
1082         pi->data = header + header_size;
1083         return 0;
1084 }
1085
1086 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1087 {
1088         void *buffer = tconn->data.rbuf;
1089         int err;
1090
1091         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1092         if (err)
1093                 return err;
1094
1095         err = decode_header(tconn, buffer, pi);
1096         tconn->last_received = jiffies;
1097
1098         return err;
1099 }
1100
1101 static void drbd_flush(struct drbd_tconn *tconn)
1102 {
1103         int rv;
1104         struct drbd_conf *mdev;
1105         int vnr;
1106
1107         if (tconn->write_ordering >= WO_bdev_flush) {
1108                 rcu_read_lock();
1109                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1110                         if (!get_ldev(mdev))
1111                                 continue;
1112                         kref_get(&mdev->kref);
1113                         rcu_read_unlock();
1114
1115                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1116                                         GFP_NOIO, NULL);
1117                         if (rv) {
1118                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1119                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1120                                  * don't try again for ANY return value != 0
1121                                  * if (rv == -EOPNOTSUPP) */
1122                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1123                         }
1124                         put_ldev(mdev);
1125                         kref_put(&mdev->kref, &drbd_minor_destroy);
1126
1127                         rcu_read_lock();
1128                         if (rv)
1129                                 break;
1130                 }
1131                 rcu_read_unlock();
1132         }
1133 }
1134
1135 /**
1136  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1137  * @mdev:       DRBD device.
1138  * @epoch:      Epoch object.
1139  * @ev:         Epoch event.
1140  */
1141 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1142                                                struct drbd_epoch *epoch,
1143                                                enum epoch_event ev)
1144 {
1145         int epoch_size;
1146         struct drbd_epoch *next_epoch;
1147         enum finish_epoch rv = FE_STILL_LIVE;
1148
1149         spin_lock(&tconn->epoch_lock);
1150         do {
1151                 next_epoch = NULL;
1152
1153                 epoch_size = atomic_read(&epoch->epoch_size);
1154
1155                 switch (ev & ~EV_CLEANUP) {
1156                 case EV_PUT:
1157                         atomic_dec(&epoch->active);
1158                         break;
1159                 case EV_GOT_BARRIER_NR:
1160                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1161                         break;
1162                 case EV_BECAME_LAST:
1163                         /* nothing to do*/
1164                         break;
1165                 }
1166
1167                 if (epoch_size != 0 &&
1168                     atomic_read(&epoch->active) == 0 &&
1169                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1170                         if (!(ev & EV_CLEANUP)) {
1171                                 spin_unlock(&tconn->epoch_lock);
1172                                 drbd_send_b_ack(epoch->mdev, epoch->barrier_nr, epoch_size);
1173                                 spin_lock(&tconn->epoch_lock);
1174                         }
1175                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1176                                 dec_unacked(epoch->mdev);
1177
1178                         if (tconn->current_epoch != epoch) {
1179                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1180                                 list_del(&epoch->list);
1181                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1182                                 tconn->epochs--;
1183                                 kfree(epoch);
1184
1185                                 if (rv == FE_STILL_LIVE)
1186                                         rv = FE_DESTROYED;
1187                         } else {
1188                                 epoch->flags = 0;
1189                                 atomic_set(&epoch->epoch_size, 0);
1190                                 /* atomic_set(&epoch->active, 0); is already zero */
1191                                 if (rv == FE_STILL_LIVE)
1192                                         rv = FE_RECYCLED;
1193                         }
1194                 }
1195
1196                 if (!next_epoch)
1197                         break;
1198
1199                 epoch = next_epoch;
1200         } while (1);
1201
1202         spin_unlock(&tconn->epoch_lock);
1203
1204         return rv;
1205 }
1206
1207 /**
1208  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1209  * @tconn:      DRBD connection.
1210  * @wo:         Write ordering method to try.
1211  */
1212 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1213 {
1214         struct disk_conf *dc;
1215         struct drbd_conf *mdev;
1216         enum write_ordering_e pwo;
1217         int vnr;
1218         static char *write_ordering_str[] = {
1219                 [WO_none] = "none",
1220                 [WO_drain_io] = "drain",
1221                 [WO_bdev_flush] = "flush",
1222         };
1223
1224         pwo = tconn->write_ordering;
1225         wo = min(pwo, wo);
1226         rcu_read_lock();
1227         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1228                 if (!get_ldev(mdev))
1229                         continue;
1230                 dc = rcu_dereference(mdev->ldev->disk_conf);
1231
1232                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1233                         wo = WO_drain_io;
1234                 if (wo == WO_drain_io && !dc->disk_drain)
1235                         wo = WO_none;
1236                 put_ldev(mdev);
1237         }
1238         rcu_read_unlock();
1239         tconn->write_ordering = wo;
1240         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1241                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1242 }
1243
1244 /**
1245  * drbd_submit_peer_request()
1246  * @mdev:       DRBD device.
1247  * @peer_req:   peer request
1248  * @rw:         flag field, see bio->bi_rw
1249  *
1250  * May spread the pages to multiple bios,
1251  * depending on bio_add_page restrictions.
1252  *
1253  * Returns 0 if all bios have been submitted,
1254  * -ENOMEM if we could not allocate enough bios,
1255  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1256  *  single page to an empty bio (which should never happen and likely indicates
1257  *  that the lower level IO stack is in some way broken). This has been observed
1258  *  on certain Xen deployments.
1259  */
1260 /* TODO allocate from our own bio_set. */
1261 int drbd_submit_peer_request(struct drbd_conf *mdev,
1262                              struct drbd_peer_request *peer_req,
1263                              const unsigned rw, const int fault_type)
1264 {
1265         struct bio *bios = NULL;
1266         struct bio *bio;
1267         struct page *page = peer_req->pages;
1268         sector_t sector = peer_req->i.sector;
1269         unsigned ds = peer_req->i.size;
1270         unsigned n_bios = 0;
1271         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1272         int err = -ENOMEM;
1273
1274         /* In most cases, we will only need one bio.  But in case the lower
1275          * level restrictions happen to be different at this offset on this
1276          * side than those of the sending peer, we may need to submit the
1277          * request in more than one bio.
1278          *
1279          * Plain bio_alloc is good enough here, this is no DRBD internally
1280          * generated bio, but a bio allocated on behalf of the peer.
1281          */
1282 next_bio:
1283         bio = bio_alloc(GFP_NOIO, nr_pages);
1284         if (!bio) {
1285                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1286                 goto fail;
1287         }
1288         /* > peer_req->i.sector, unless this is the first bio */
1289         bio->bi_sector = sector;
1290         bio->bi_bdev = mdev->ldev->backing_bdev;
1291         bio->bi_rw = rw;
1292         bio->bi_private = peer_req;
1293         bio->bi_end_io = drbd_peer_request_endio;
1294
1295         bio->bi_next = bios;
1296         bios = bio;
1297         ++n_bios;
1298
1299         page_chain_for_each(page) {
1300                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1301                 if (!bio_add_page(bio, page, len, 0)) {
1302                         /* A single page must always be possible!
1303                          * But in case it fails anyways,
1304                          * we deal with it, and complain (below). */
1305                         if (bio->bi_vcnt == 0) {
1306                                 dev_err(DEV,
1307                                         "bio_add_page failed for len=%u, "
1308                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1309                                         len, (unsigned long long)bio->bi_sector);
1310                                 err = -ENOSPC;
1311                                 goto fail;
1312                         }
1313                         goto next_bio;
1314                 }
1315                 ds -= len;
1316                 sector += len >> 9;
1317                 --nr_pages;
1318         }
1319         D_ASSERT(page == NULL);
1320         D_ASSERT(ds == 0);
1321
1322         atomic_set(&peer_req->pending_bios, n_bios);
1323         do {
1324                 bio = bios;
1325                 bios = bios->bi_next;
1326                 bio->bi_next = NULL;
1327
1328                 drbd_generic_make_request(mdev, fault_type, bio);
1329         } while (bios);
1330         return 0;
1331
1332 fail:
1333         while (bios) {
1334                 bio = bios;
1335                 bios = bios->bi_next;
1336                 bio_put(bio);
1337         }
1338         return err;
1339 }
1340
1341 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1342                                              struct drbd_peer_request *peer_req)
1343 {
1344         struct drbd_interval *i = &peer_req->i;
1345
1346         drbd_remove_interval(&mdev->write_requests, i);
1347         drbd_clear_interval(i);
1348
1349         /* Wake up any processes waiting for this peer request to complete.  */
1350         if (i->waiting)
1351                 wake_up(&mdev->misc_wait);
1352 }
1353
1354 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1355 {
1356         struct drbd_conf *mdev;
1357         int vnr;
1358
1359         rcu_read_lock();
1360         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1361                 kref_get(&mdev->kref);
1362                 rcu_read_unlock();
1363                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1364                 kref_put(&mdev->kref, &drbd_minor_destroy);
1365                 rcu_read_lock();
1366         }
1367         rcu_read_unlock();
1368 }
1369
1370 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1371 {
1372         struct drbd_conf *mdev;
1373         int rv;
1374         struct p_barrier *p = pi->data;
1375         struct drbd_epoch *epoch;
1376
1377         mdev = vnr_to_mdev(tconn, pi->vnr);
1378         if (!mdev)
1379                 return -EIO;
1380
1381         inc_unacked(mdev);
1382
1383         tconn->current_epoch->barrier_nr = p->barrier;
1384         tconn->current_epoch->mdev = mdev;
1385         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1386
1387         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1388          * the activity log, which means it would not be resynced in case the
1389          * R_PRIMARY crashes now.
1390          * Therefore we must send the barrier_ack after the barrier request was
1391          * completed. */
1392         switch (tconn->write_ordering) {
1393         case WO_none:
1394                 if (rv == FE_RECYCLED)
1395                         return 0;
1396
1397                 /* receiver context, in the writeout path of the other node.
1398                  * avoid potential distributed deadlock */
1399                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1400                 if (epoch)
1401                         break;
1402                 else
1403                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1404                         /* Fall through */
1405
1406         case WO_bdev_flush:
1407         case WO_drain_io:
1408                 conn_wait_active_ee_empty(tconn);
1409                 drbd_flush(tconn);
1410
1411                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1412                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1413                         if (epoch)
1414                                 break;
1415                 }
1416
1417                 epoch = tconn->current_epoch;
1418                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1419
1420                 D_ASSERT(atomic_read(&epoch->active) == 0);
1421                 D_ASSERT(epoch->flags == 0);
1422
1423                 return 0;
1424         default:
1425                 dev_err(DEV, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1426                 return -EIO;
1427         }
1428
1429         epoch->flags = 0;
1430         atomic_set(&epoch->epoch_size, 0);
1431         atomic_set(&epoch->active, 0);
1432
1433         spin_lock(&tconn->epoch_lock);
1434         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1435                 list_add(&epoch->list, &tconn->current_epoch->list);
1436                 tconn->current_epoch = epoch;
1437                 tconn->epochs++;
1438         } else {
1439                 /* The current_epoch got recycled while we allocated this one... */
1440                 kfree(epoch);
1441         }
1442         spin_unlock(&tconn->epoch_lock);
1443
1444         return 0;
1445 }
1446
1447 /* used from receive_RSDataReply (recv_resync_read)
1448  * and from receive_Data */
1449 static struct drbd_peer_request *
1450 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1451               int data_size) __must_hold(local)
1452 {
1453         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1454         struct drbd_peer_request *peer_req;
1455         struct page *page;
1456         int dgs, ds, err;
1457         void *dig_in = mdev->tconn->int_dig_in;
1458         void *dig_vv = mdev->tconn->int_dig_vv;
1459         unsigned long *data;
1460
1461         dgs = 0;
1462         if (mdev->tconn->peer_integrity_tfm) {
1463                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1464                 /*
1465                  * FIXME: Receive the incoming digest into the receive buffer
1466                  *        here, together with its struct p_data?
1467                  */
1468                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1469                 if (err)
1470                         return NULL;
1471                 data_size -= dgs;
1472         }
1473
1474         if (!expect(data_size != 0))
1475                 return NULL;
1476         if (!expect(IS_ALIGNED(data_size, 512)))
1477                 return NULL;
1478         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1479                 return NULL;
1480
1481         /* even though we trust out peer,
1482          * we sometimes have to double check. */
1483         if (sector + (data_size>>9) > capacity) {
1484                 dev_err(DEV, "request from peer beyond end of local disk: "
1485                         "capacity: %llus < sector: %llus + size: %u\n",
1486                         (unsigned long long)capacity,
1487                         (unsigned long long)sector, data_size);
1488                 return NULL;
1489         }
1490
1491         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1492          * "criss-cross" setup, that might cause write-out on some other DRBD,
1493          * which in turn might block on the other node at this very place.  */
1494         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1495         if (!peer_req)
1496                 return NULL;
1497
1498         ds = data_size;
1499         page = peer_req->pages;
1500         page_chain_for_each(page) {
1501                 unsigned len = min_t(int, ds, PAGE_SIZE);
1502                 data = kmap(page);
1503                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1504                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1505                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1506                         data[0] = data[0] ^ (unsigned long)-1;
1507                 }
1508                 kunmap(page);
1509                 if (err) {
1510                         drbd_free_peer_req(mdev, peer_req);
1511                         return NULL;
1512                 }
1513                 ds -= len;
1514         }
1515
1516         if (dgs) {
1517                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1518                 if (memcmp(dig_in, dig_vv, dgs)) {
1519                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1520                                 (unsigned long long)sector, data_size);
1521                         drbd_free_peer_req(mdev, peer_req);
1522                         return NULL;
1523                 }
1524         }
1525         mdev->recv_cnt += data_size>>9;
1526         return peer_req;
1527 }
1528
1529 /* drbd_drain_block() just takes a data block
1530  * out of the socket input buffer, and discards it.
1531  */
1532 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1533 {
1534         struct page *page;
1535         int err = 0;
1536         void *data;
1537
1538         if (!data_size)
1539                 return 0;
1540
1541         page = drbd_alloc_pages(mdev, 1, 1);
1542
1543         data = kmap(page);
1544         while (data_size) {
1545                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1546
1547                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1548                 if (err)
1549                         break;
1550                 data_size -= len;
1551         }
1552         kunmap(page);
1553         drbd_free_pages(mdev, page, 0);
1554         return err;
1555 }
1556
1557 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1558                            sector_t sector, int data_size)
1559 {
1560         struct bio_vec *bvec;
1561         struct bio *bio;
1562         int dgs, err, i, expect;
1563         void *dig_in = mdev->tconn->int_dig_in;
1564         void *dig_vv = mdev->tconn->int_dig_vv;
1565
1566         dgs = 0;
1567         if (mdev->tconn->peer_integrity_tfm) {
1568                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1569                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1570                 if (err)
1571                         return err;
1572                 data_size -= dgs;
1573         }
1574
1575         /* optimistically update recv_cnt.  if receiving fails below,
1576          * we disconnect anyways, and counters will be reset. */
1577         mdev->recv_cnt += data_size>>9;
1578
1579         bio = req->master_bio;
1580         D_ASSERT(sector == bio->bi_sector);
1581
1582         bio_for_each_segment(bvec, bio, i) {
1583                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1584                 expect = min_t(int, data_size, bvec->bv_len);
1585                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1586                 kunmap(bvec->bv_page);
1587                 if (err)
1588                         return err;
1589                 data_size -= expect;
1590         }
1591
1592         if (dgs) {
1593                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1594                 if (memcmp(dig_in, dig_vv, dgs)) {
1595                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1596                         return -EINVAL;
1597                 }
1598         }
1599
1600         D_ASSERT(data_size == 0);
1601         return 0;
1602 }
1603
1604 /*
1605  * e_end_resync_block() is called in asender context via
1606  * drbd_finish_peer_reqs().
1607  */
1608 static int e_end_resync_block(struct drbd_work *w, int unused)
1609 {
1610         struct drbd_peer_request *peer_req =
1611                 container_of(w, struct drbd_peer_request, w);
1612         struct drbd_conf *mdev = w->mdev;
1613         sector_t sector = peer_req->i.sector;
1614         int err;
1615
1616         D_ASSERT(drbd_interval_empty(&peer_req->i));
1617
1618         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1619                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1620                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1621         } else {
1622                 /* Record failure to sync */
1623                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1624
1625                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1626         }
1627         dec_unacked(mdev);
1628
1629         return err;
1630 }
1631
1632 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1633 {
1634         struct drbd_peer_request *peer_req;
1635
1636         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1637         if (!peer_req)
1638                 goto fail;
1639
1640         dec_rs_pending(mdev);
1641
1642         inc_unacked(mdev);
1643         /* corresponding dec_unacked() in e_end_resync_block()
1644          * respective _drbd_clear_done_ee */
1645
1646         peer_req->w.cb = e_end_resync_block;
1647
1648         spin_lock_irq(&mdev->tconn->req_lock);
1649         list_add(&peer_req->w.list, &mdev->sync_ee);
1650         spin_unlock_irq(&mdev->tconn->req_lock);
1651
1652         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1653         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1654                 return 0;
1655
1656         /* don't care for the reason here */
1657         dev_err(DEV, "submit failed, triggering re-connect\n");
1658         spin_lock_irq(&mdev->tconn->req_lock);
1659         list_del(&peer_req->w.list);
1660         spin_unlock_irq(&mdev->tconn->req_lock);
1661
1662         drbd_free_peer_req(mdev, peer_req);
1663 fail:
1664         put_ldev(mdev);
1665         return -EIO;
1666 }
1667
1668 static struct drbd_request *
1669 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1670              sector_t sector, bool missing_ok, const char *func)
1671 {
1672         struct drbd_request *req;
1673
1674         /* Request object according to our peer */
1675         req = (struct drbd_request *)(unsigned long)id;
1676         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1677                 return req;
1678         if (!missing_ok) {
1679                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1680                         (unsigned long)id, (unsigned long long)sector);
1681         }
1682         return NULL;
1683 }
1684
1685 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1686 {
1687         struct drbd_conf *mdev;
1688         struct drbd_request *req;
1689         sector_t sector;
1690         int err;
1691         struct p_data *p = pi->data;
1692
1693         mdev = vnr_to_mdev(tconn, pi->vnr);
1694         if (!mdev)
1695                 return -EIO;
1696
1697         sector = be64_to_cpu(p->sector);
1698
1699         spin_lock_irq(&mdev->tconn->req_lock);
1700         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1701         spin_unlock_irq(&mdev->tconn->req_lock);
1702         if (unlikely(!req))
1703                 return -EIO;
1704
1705         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1706          * special casing it there for the various failure cases.
1707          * still no race with drbd_fail_pending_reads */
1708         err = recv_dless_read(mdev, req, sector, pi->size);
1709         if (!err)
1710                 req_mod(req, DATA_RECEIVED);
1711         /* else: nothing. handled from drbd_disconnect...
1712          * I don't think we may complete this just yet
1713          * in case we are "on-disconnect: freeze" */
1714
1715         return err;
1716 }
1717
1718 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1719 {
1720         struct drbd_conf *mdev;
1721         sector_t sector;
1722         int err;
1723         struct p_data *p = pi->data;
1724
1725         mdev = vnr_to_mdev(tconn, pi->vnr);
1726         if (!mdev)
1727                 return -EIO;
1728
1729         sector = be64_to_cpu(p->sector);
1730         D_ASSERT(p->block_id == ID_SYNCER);
1731
1732         if (get_ldev(mdev)) {
1733                 /* data is submitted to disk within recv_resync_read.
1734                  * corresponding put_ldev done below on error,
1735                  * or in drbd_peer_request_endio. */
1736                 err = recv_resync_read(mdev, sector, pi->size);
1737         } else {
1738                 if (__ratelimit(&drbd_ratelimit_state))
1739                         dev_err(DEV, "Can not write resync data to local disk.\n");
1740
1741                 err = drbd_drain_block(mdev, pi->size);
1742
1743                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1744         }
1745
1746         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1747
1748         return err;
1749 }
1750
1751 static int w_restart_write(struct drbd_work *w, int cancel)
1752 {
1753         struct drbd_request *req = container_of(w, struct drbd_request, w);
1754         struct drbd_conf *mdev = w->mdev;
1755         struct bio *bio;
1756         unsigned long start_time;
1757         unsigned long flags;
1758
1759         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1760         if (!expect(req->rq_state & RQ_POSTPONED)) {
1761                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1762                 return -EIO;
1763         }
1764         bio = req->master_bio;
1765         start_time = req->start_time;
1766         /* Postponed requests will not have their master_bio completed!  */
1767         __req_mod(req, DISCARD_WRITE, NULL);
1768         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1769
1770         while (__drbd_make_request(mdev, bio, start_time))
1771                 /* retry */ ;
1772         return 0;
1773 }
1774
1775 static void restart_conflicting_writes(struct drbd_conf *mdev,
1776                                        sector_t sector, int size)
1777 {
1778         struct drbd_interval *i;
1779         struct drbd_request *req;
1780
1781         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1782                 if (!i->local)
1783                         continue;
1784                 req = container_of(i, struct drbd_request, i);
1785                 if (req->rq_state & RQ_LOCAL_PENDING ||
1786                     !(req->rq_state & RQ_POSTPONED))
1787                         continue;
1788                 if (expect(list_empty(&req->w.list))) {
1789                         req->w.mdev = mdev;
1790                         req->w.cb = w_restart_write;
1791                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1792                 }
1793         }
1794 }
1795
1796 /*
1797  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1798  */
1799 static int e_end_block(struct drbd_work *w, int cancel)
1800 {
1801         struct drbd_peer_request *peer_req =
1802                 container_of(w, struct drbd_peer_request, w);
1803         struct drbd_conf *mdev = w->mdev;
1804         sector_t sector = peer_req->i.sector;
1805         int err = 0, pcmd;
1806
1807         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1808                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1809                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1810                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1811                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1812                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1813                         err = drbd_send_ack(mdev, pcmd, peer_req);
1814                         if (pcmd == P_RS_WRITE_ACK)
1815                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1816                 } else {
1817                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1818                         /* we expect it to be marked out of sync anyways...
1819                          * maybe assert this?  */
1820                 }
1821                 dec_unacked(mdev);
1822         }
1823         /* we delete from the conflict detection hash _after_ we sent out the
1824          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1825         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1826                 spin_lock_irq(&mdev->tconn->req_lock);
1827                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1828                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1829                 if (peer_req->flags & EE_RESTART_REQUESTS)
1830                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1831                 spin_unlock_irq(&mdev->tconn->req_lock);
1832         } else
1833                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1834
1835         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1836
1837         return err;
1838 }
1839
1840 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1841 {
1842         struct drbd_conf *mdev = w->mdev;
1843         struct drbd_peer_request *peer_req =
1844                 container_of(w, struct drbd_peer_request, w);
1845         int err;
1846
1847         err = drbd_send_ack(mdev, ack, peer_req);
1848         dec_unacked(mdev);
1849
1850         return err;
1851 }
1852
1853 static int e_send_discard_write(struct drbd_work *w, int unused)
1854 {
1855         return e_send_ack(w, P_DISCARD_WRITE);
1856 }
1857
1858 static int e_send_retry_write(struct drbd_work *w, int unused)
1859 {
1860         struct drbd_tconn *tconn = w->mdev->tconn;
1861
1862         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1863                              P_RETRY_WRITE : P_DISCARD_WRITE);
1864 }
1865
1866 static bool seq_greater(u32 a, u32 b)
1867 {
1868         /*
1869          * We assume 32-bit wrap-around here.
1870          * For 24-bit wrap-around, we would have to shift:
1871          *  a <<= 8; b <<= 8;
1872          */
1873         return (s32)a - (s32)b > 0;
1874 }
1875
1876 static u32 seq_max(u32 a, u32 b)
1877 {
1878         return seq_greater(a, b) ? a : b;
1879 }
1880
1881 static bool need_peer_seq(struct drbd_conf *mdev)
1882 {
1883         struct drbd_tconn *tconn = mdev->tconn;
1884         int tp;
1885
1886         /*
1887          * We only need to keep track of the last packet_seq number of our peer
1888          * if we are in dual-primary mode and we have the discard flag set; see
1889          * handle_write_conflicts().
1890          */
1891
1892         rcu_read_lock();
1893         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1894         rcu_read_unlock();
1895
1896         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1897 }
1898
1899 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1900 {
1901         unsigned int newest_peer_seq;
1902
1903         if (need_peer_seq(mdev)) {
1904                 spin_lock(&mdev->peer_seq_lock);
1905                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1906                 mdev->peer_seq = newest_peer_seq;
1907                 spin_unlock(&mdev->peer_seq_lock);
1908                 /* wake up only if we actually changed mdev->peer_seq */
1909                 if (peer_seq == newest_peer_seq)
1910                         wake_up(&mdev->seq_wait);
1911         }
1912 }
1913
1914 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1915 {
1916         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1917 }
1918
1919 /* maybe change sync_ee into interval trees as well? */
1920 static bool overlaping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1921 {
1922         struct drbd_peer_request *rs_req;
1923         bool rv = 0;
1924
1925         spin_lock_irq(&mdev->tconn->req_lock);
1926         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1927                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1928                              rs_req->i.sector, rs_req->i.size)) {
1929                         rv = 1;
1930                         break;
1931                 }
1932         }
1933         spin_unlock_irq(&mdev->tconn->req_lock);
1934
1935         if (rv)
1936                 dev_warn(DEV, "WARN: Avoiding concurrent data/resync write to single sector.\n");
1937
1938         return rv;
1939 }
1940
1941 /* Called from receive_Data.
1942  * Synchronize packets on sock with packets on msock.
1943  *
1944  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1945  * packet traveling on msock, they are still processed in the order they have
1946  * been sent.
1947  *
1948  * Note: we don't care for Ack packets overtaking P_DATA packets.
1949  *
1950  * In case packet_seq is larger than mdev->peer_seq number, there are
1951  * outstanding packets on the msock. We wait for them to arrive.
1952  * In case we are the logically next packet, we update mdev->peer_seq
1953  * ourselves. Correctly handles 32bit wrap around.
1954  *
1955  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1956  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1957  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1958  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1959  *
1960  * returns 0 if we may process the packet,
1961  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1962 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1963 {
1964         DEFINE_WAIT(wait);
1965         long timeout;
1966         int ret;
1967
1968         if (!need_peer_seq(mdev))
1969                 return 0;
1970
1971         spin_lock(&mdev->peer_seq_lock);
1972         for (;;) {
1973                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1974                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1975                         ret = 0;
1976                         break;
1977                 }
1978                 if (signal_pending(current)) {
1979                         ret = -ERESTARTSYS;
1980                         break;
1981                 }
1982                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1983                 spin_unlock(&mdev->peer_seq_lock);
1984                 rcu_read_lock();
1985                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1986                 rcu_read_unlock();
1987                 timeout = schedule_timeout(timeout);
1988                 spin_lock(&mdev->peer_seq_lock);
1989                 if (!timeout) {
1990                         ret = -ETIMEDOUT;
1991                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1992                         break;
1993                 }
1994         }
1995         spin_unlock(&mdev->peer_seq_lock);
1996         finish_wait(&mdev->seq_wait, &wait);
1997         return ret;
1998 }
1999
2000 /* see also bio_flags_to_wire()
2001  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2002  * flags and back. We may replicate to other kernel versions. */
2003 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
2004 {
2005         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2006                 (dpf & DP_FUA ? REQ_FUA : 0) |
2007                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2008                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2009 }
2010
2011 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2012                                     unsigned int size)
2013 {
2014         struct drbd_interval *i;
2015
2016     repeat:
2017         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2018                 struct drbd_request *req;
2019                 struct bio_and_error m;
2020
2021                 if (!i->local)
2022                         continue;
2023                 req = container_of(i, struct drbd_request, i);
2024                 if (!(req->rq_state & RQ_POSTPONED))
2025                         continue;
2026                 req->rq_state &= ~RQ_POSTPONED;
2027                 __req_mod(req, NEG_ACKED, &m);
2028                 spin_unlock_irq(&mdev->tconn->req_lock);
2029                 if (m.bio)
2030                         complete_master_bio(mdev, &m);
2031                 spin_lock_irq(&mdev->tconn->req_lock);
2032                 goto repeat;
2033         }
2034 }
2035
2036 static int handle_write_conflicts(struct drbd_conf *mdev,
2037                                   struct drbd_peer_request *peer_req)
2038 {
2039         struct drbd_tconn *tconn = mdev->tconn;
2040         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
2041         sector_t sector = peer_req->i.sector;
2042         const unsigned int size = peer_req->i.size;
2043         struct drbd_interval *i;
2044         bool equal;
2045         int err;
2046
2047         /*
2048          * Inserting the peer request into the write_requests tree will prevent
2049          * new conflicting local requests from being added.
2050          */
2051         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2052
2053     repeat:
2054         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2055                 if (i == &peer_req->i)
2056                         continue;
2057
2058                 if (!i->local) {
2059                         /*
2060                          * Our peer has sent a conflicting remote request; this
2061                          * should not happen in a two-node setup.  Wait for the
2062                          * earlier peer request to complete.
2063                          */
2064                         err = drbd_wait_misc(mdev, i);
2065                         if (err)
2066                                 goto out;
2067                         goto repeat;
2068                 }
2069
2070                 equal = i->sector == sector && i->size == size;
2071                 if (resolve_conflicts) {
2072                         /*
2073                          * If the peer request is fully contained within the
2074                          * overlapping request, it can be discarded; otherwise,
2075                          * it will be retried once all overlapping requests
2076                          * have completed.
2077                          */
2078                         bool discard = i->sector <= sector && i->sector +
2079                                        (i->size >> 9) >= sector + (size >> 9);
2080
2081                         if (!equal)
2082                                 dev_alert(DEV, "Concurrent writes detected: "
2083                                                "local=%llus +%u, remote=%llus +%u, "
2084                                                "assuming %s came first\n",
2085                                           (unsigned long long)i->sector, i->size,
2086                                           (unsigned long long)sector, size,
2087                                           discard ? "local" : "remote");
2088
2089                         inc_unacked(mdev);
2090                         peer_req->w.cb = discard ? e_send_discard_write :
2091                                                    e_send_retry_write;
2092                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2093                         wake_asender(mdev->tconn);
2094
2095                         err = -ENOENT;
2096                         goto out;
2097                 } else {
2098                         struct drbd_request *req =
2099                                 container_of(i, struct drbd_request, i);
2100
2101                         if (!equal)
2102                                 dev_alert(DEV, "Concurrent writes detected: "
2103                                                "local=%llus +%u, remote=%llus +%u\n",
2104                                           (unsigned long long)i->sector, i->size,
2105                                           (unsigned long long)sector, size);
2106
2107                         if (req->rq_state & RQ_LOCAL_PENDING ||
2108                             !(req->rq_state & RQ_POSTPONED)) {
2109                                 /*
2110                                  * Wait for the node with the discard flag to
2111                                  * decide if this request will be discarded or
2112                                  * retried.  Requests that are discarded will
2113                                  * disappear from the write_requests tree.
2114                                  *
2115                                  * In addition, wait for the conflicting
2116                                  * request to finish locally before submitting
2117                                  * the conflicting peer request.
2118                                  */
2119                                 err = drbd_wait_misc(mdev, &req->i);
2120                                 if (err) {
2121                                         _conn_request_state(mdev->tconn,
2122                                                             NS(conn, C_TIMEOUT),
2123                                                             CS_HARD);
2124                                         fail_postponed_requests(mdev, sector, size);
2125                                         goto out;
2126                                 }
2127                                 goto repeat;
2128                         }
2129                         /*
2130                          * Remember to restart the conflicting requests after
2131                          * the new peer request has completed.
2132                          */
2133                         peer_req->flags |= EE_RESTART_REQUESTS;
2134                 }
2135         }
2136         err = 0;
2137
2138     out:
2139         if (err)
2140                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2141         return err;
2142 }
2143
2144 /* mirrored write */
2145 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2146 {
2147         struct drbd_conf *mdev;
2148         sector_t sector;
2149         struct drbd_peer_request *peer_req;
2150         struct p_data *p = pi->data;
2151         u32 peer_seq = be32_to_cpu(p->seq_num);
2152         int rw = WRITE;
2153         u32 dp_flags;
2154         int err, tp;
2155
2156         mdev = vnr_to_mdev(tconn, pi->vnr);
2157         if (!mdev)
2158                 return -EIO;
2159
2160         if (!get_ldev(mdev)) {
2161                 int err2;
2162
2163                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2164                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2165                 atomic_inc(&tconn->current_epoch->epoch_size);
2166                 err2 = drbd_drain_block(mdev, pi->size);
2167                 if (!err)
2168                         err = err2;
2169                 return err;
2170         }
2171
2172         /*
2173          * Corresponding put_ldev done either below (on various errors), or in
2174          * drbd_peer_request_endio, if we successfully submit the data at the
2175          * end of this function.
2176          */
2177
2178         sector = be64_to_cpu(p->sector);
2179         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2180         if (!peer_req) {
2181                 put_ldev(mdev);
2182                 return -EIO;
2183         }
2184
2185         peer_req->w.cb = e_end_block;
2186
2187         dp_flags = be32_to_cpu(p->dp_flags);
2188         rw |= wire_flags_to_bio(mdev, dp_flags);
2189
2190         if (dp_flags & DP_MAY_SET_IN_SYNC)
2191                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2192
2193         spin_lock(&tconn->epoch_lock);
2194         peer_req->epoch = tconn->current_epoch;
2195         atomic_inc(&peer_req->epoch->epoch_size);
2196         atomic_inc(&peer_req->epoch->active);
2197         spin_unlock(&tconn->epoch_lock);
2198
2199         rcu_read_lock();
2200         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2201         rcu_read_unlock();
2202         if (tp) {
2203                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2204                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2205                 if (err)
2206                         goto out_interrupted;
2207                 spin_lock_irq(&mdev->tconn->req_lock);
2208                 err = handle_write_conflicts(mdev, peer_req);
2209                 if (err) {
2210                         spin_unlock_irq(&mdev->tconn->req_lock);
2211                         if (err == -ENOENT) {
2212                                 put_ldev(mdev);
2213                                 return 0;
2214                         }
2215                         goto out_interrupted;
2216                 }
2217         } else
2218                 spin_lock_irq(&mdev->tconn->req_lock);
2219         list_add(&peer_req->w.list, &mdev->active_ee);
2220         spin_unlock_irq(&mdev->tconn->req_lock);
2221
2222         if (mdev->state.conn == C_SYNC_TARGET)
2223                 wait_event(mdev->ee_wait, !overlaping_resync_write(mdev, peer_req));
2224
2225         if (mdev->tconn->agreed_pro_version < 100) {
2226                 rcu_read_lock();
2227                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2228                 case DRBD_PROT_C:
2229                         dp_flags |= DP_SEND_WRITE_ACK;
2230                         break;
2231                 case DRBD_PROT_B:
2232                         dp_flags |= DP_SEND_RECEIVE_ACK;
2233                         break;
2234                 }
2235                 rcu_read_unlock();
2236         }
2237
2238         if (dp_flags & DP_SEND_WRITE_ACK) {
2239                 peer_req->flags |= EE_SEND_WRITE_ACK;
2240                 inc_unacked(mdev);
2241                 /* corresponding dec_unacked() in e_end_block()
2242                  * respective _drbd_clear_done_ee */
2243         }
2244
2245         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2246                 /* I really don't like it that the receiver thread
2247                  * sends on the msock, but anyways */
2248                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2249         }
2250
2251         if (mdev->state.pdsk < D_INCONSISTENT) {
2252                 /* In case we have the only disk of the cluster, */
2253                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2254                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2255                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2256                 drbd_al_begin_io(mdev, &peer_req->i);
2257         }
2258
2259         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2260         if (!err)
2261                 return 0;
2262
2263         /* don't care for the reason here */
2264         dev_err(DEV, "submit failed, triggering re-connect\n");
2265         spin_lock_irq(&mdev->tconn->req_lock);
2266         list_del(&peer_req->w.list);
2267         drbd_remove_epoch_entry_interval(mdev, peer_req);
2268         spin_unlock_irq(&mdev->tconn->req_lock);
2269         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2270                 drbd_al_complete_io(mdev, &peer_req->i);
2271
2272 out_interrupted:
2273         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2274         put_ldev(mdev);
2275         drbd_free_peer_req(mdev, peer_req);
2276         return err;
2277 }
2278
2279 /* We may throttle resync, if the lower device seems to be busy,
2280  * and current sync rate is above c_min_rate.
2281  *
2282  * To decide whether or not the lower device is busy, we use a scheme similar
2283  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2284  * (more than 64 sectors) of activity we cannot account for with our own resync
2285  * activity, it obviously is "busy".
2286  *
2287  * The current sync rate used here uses only the most recent two step marks,
2288  * to have a short time average so we can react faster.
2289  */
2290 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2291 {
2292         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2293         unsigned long db, dt, dbdt;
2294         struct lc_element *tmp;
2295         int curr_events;
2296         int throttle = 0;
2297         unsigned int c_min_rate;
2298
2299         rcu_read_lock();
2300         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2301         rcu_read_unlock();
2302
2303         /* feature disabled? */
2304         if (c_min_rate == 0)
2305                 return 0;
2306
2307         spin_lock_irq(&mdev->al_lock);
2308         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2309         if (tmp) {
2310                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2311                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2312                         spin_unlock_irq(&mdev->al_lock);
2313                         return 0;
2314                 }
2315                 /* Do not slow down if app IO is already waiting for this extent */
2316         }
2317         spin_unlock_irq(&mdev->al_lock);
2318
2319         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2320                       (int)part_stat_read(&disk->part0, sectors[1]) -
2321                         atomic_read(&mdev->rs_sect_ev);
2322
2323         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2324                 unsigned long rs_left;
2325                 int i;
2326
2327                 mdev->rs_last_events = curr_events;
2328
2329                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2330                  * approx. */
2331                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2332
2333                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2334                         rs_left = mdev->ov_left;
2335                 else
2336                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2337
2338                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2339                 if (!dt)
2340                         dt++;
2341                 db = mdev->rs_mark_left[i] - rs_left;
2342                 dbdt = Bit2KB(db/dt);
2343
2344                 if (dbdt > c_min_rate)
2345                         throttle = 1;
2346         }
2347         return throttle;
2348 }
2349
2350
2351 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2352 {
2353         struct drbd_conf *mdev;
2354         sector_t sector;
2355         sector_t capacity;
2356         struct drbd_peer_request *peer_req;
2357         struct digest_info *di = NULL;
2358         int size, verb;
2359         unsigned int fault_type;
2360         struct p_block_req *p = pi->data;
2361
2362         mdev = vnr_to_mdev(tconn, pi->vnr);
2363         if (!mdev)
2364                 return -EIO;
2365         capacity = drbd_get_capacity(mdev->this_bdev);
2366
2367         sector = be64_to_cpu(p->sector);
2368         size   = be32_to_cpu(p->blksize);
2369
2370         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2371                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2372                                 (unsigned long long)sector, size);
2373                 return -EINVAL;
2374         }
2375         if (sector + (size>>9) > capacity) {
2376                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2377                                 (unsigned long long)sector, size);
2378                 return -EINVAL;
2379         }
2380
2381         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2382                 verb = 1;
2383                 switch (pi->cmd) {
2384                 case P_DATA_REQUEST:
2385                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2386                         break;
2387                 case P_RS_DATA_REQUEST:
2388                 case P_CSUM_RS_REQUEST:
2389                 case P_OV_REQUEST:
2390                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2391                         break;
2392                 case P_OV_REPLY:
2393                         verb = 0;
2394                         dec_rs_pending(mdev);
2395                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2396                         break;
2397                 default:
2398                         BUG();
2399                 }
2400                 if (verb && __ratelimit(&drbd_ratelimit_state))
2401                         dev_err(DEV, "Can not satisfy peer's read request, "
2402                             "no local data.\n");
2403
2404                 /* drain possibly payload */
2405                 return drbd_drain_block(mdev, pi->size);
2406         }
2407
2408         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2409          * "criss-cross" setup, that might cause write-out on some other DRBD,
2410          * which in turn might block on the other node at this very place.  */
2411         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2412         if (!peer_req) {
2413                 put_ldev(mdev);
2414                 return -ENOMEM;
2415         }
2416
2417         switch (pi->cmd) {
2418         case P_DATA_REQUEST:
2419                 peer_req->w.cb = w_e_end_data_req;
2420                 fault_type = DRBD_FAULT_DT_RD;
2421                 /* application IO, don't drbd_rs_begin_io */
2422                 goto submit;
2423
2424         case P_RS_DATA_REQUEST:
2425                 peer_req->w.cb = w_e_end_rsdata_req;
2426                 fault_type = DRBD_FAULT_RS_RD;
2427                 /* used in the sector offset progress display */
2428                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2429                 break;
2430
2431         case P_OV_REPLY:
2432         case P_CSUM_RS_REQUEST:
2433                 fault_type = DRBD_FAULT_RS_RD;
2434                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2435                 if (!di)
2436                         goto out_free_e;
2437
2438                 di->digest_size = pi->size;
2439                 di->digest = (((char *)di)+sizeof(struct digest_info));
2440
2441                 peer_req->digest = di;
2442                 peer_req->flags |= EE_HAS_DIGEST;
2443
2444                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2445                         goto out_free_e;
2446
2447                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2448                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2449                         peer_req->w.cb = w_e_end_csum_rs_req;
2450                         /* used in the sector offset progress display */
2451                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2452                 } else if (pi->cmd == P_OV_REPLY) {
2453                         /* track progress, we may need to throttle */
2454                         atomic_add(size >> 9, &mdev->rs_sect_in);
2455                         peer_req->w.cb = w_e_end_ov_reply;
2456                         dec_rs_pending(mdev);
2457                         /* drbd_rs_begin_io done when we sent this request,
2458                          * but accounting still needs to be done. */
2459                         goto submit_for_resync;
2460                 }
2461                 break;
2462
2463         case P_OV_REQUEST:
2464                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2465                     mdev->tconn->agreed_pro_version >= 90) {
2466                         unsigned long now = jiffies;
2467                         int i;
2468                         mdev->ov_start_sector = sector;
2469                         mdev->ov_position = sector;
2470                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2471                         mdev->rs_total = mdev->ov_left;
2472                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2473                                 mdev->rs_mark_left[i] = mdev->ov_left;
2474                                 mdev->rs_mark_time[i] = now;
2475                         }
2476                         dev_info(DEV, "Online Verify start sector: %llu\n",
2477                                         (unsigned long long)sector);
2478                 }
2479                 peer_req->w.cb = w_e_end_ov_req;
2480                 fault_type = DRBD_FAULT_RS_RD;
2481                 break;
2482
2483         default:
2484                 BUG();
2485         }
2486
2487         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2488          * wrt the receiver, but it is not as straightforward as it may seem.
2489          * Various places in the resync start and stop logic assume resync
2490          * requests are processed in order, requeuing this on the worker thread
2491          * introduces a bunch of new code for synchronization between threads.
2492          *
2493          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2494          * "forever", throttling after drbd_rs_begin_io will lock that extent
2495          * for application writes for the same time.  For now, just throttle
2496          * here, where the rest of the code expects the receiver to sleep for
2497          * a while, anyways.
2498          */
2499
2500         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2501          * this defers syncer requests for some time, before letting at least
2502          * on request through.  The resync controller on the receiving side
2503          * will adapt to the incoming rate accordingly.
2504          *
2505          * We cannot throttle here if remote is Primary/SyncTarget:
2506          * we would also throttle its application reads.
2507          * In that case, throttling is done on the SyncTarget only.
2508          */
2509         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2510                 schedule_timeout_uninterruptible(HZ/10);
2511         if (drbd_rs_begin_io(mdev, sector))
2512                 goto out_free_e;
2513
2514 submit_for_resync:
2515         atomic_add(size >> 9, &mdev->rs_sect_ev);
2516
2517 submit:
2518         inc_unacked(mdev);
2519         spin_lock_irq(&mdev->tconn->req_lock);
2520         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2521         spin_unlock_irq(&mdev->tconn->req_lock);
2522
2523         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2524                 return 0;
2525
2526         /* don't care for the reason here */
2527         dev_err(DEV, "submit failed, triggering re-connect\n");
2528         spin_lock_irq(&mdev->tconn->req_lock);
2529         list_del(&peer_req->w.list);
2530         spin_unlock_irq(&mdev->tconn->req_lock);
2531         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2532
2533 out_free_e:
2534         put_ldev(mdev);
2535         drbd_free_peer_req(mdev, peer_req);
2536         return -EIO;
2537 }
2538
2539 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2540 {
2541         int self, peer, rv = -100;
2542         unsigned long ch_self, ch_peer;
2543         enum drbd_after_sb_p after_sb_0p;
2544
2545         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2546         peer = mdev->p_uuid[UI_BITMAP] & 1;
2547
2548         ch_peer = mdev->p_uuid[UI_SIZE];
2549         ch_self = mdev->comm_bm_set;
2550
2551         rcu_read_lock();
2552         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2553         rcu_read_unlock();
2554         switch (after_sb_0p) {
2555         case ASB_CONSENSUS:
2556         case ASB_DISCARD_SECONDARY:
2557         case ASB_CALL_HELPER:
2558         case ASB_VIOLENTLY:
2559                 dev_err(DEV, "Configuration error.\n");
2560                 break;
2561         case ASB_DISCONNECT:
2562                 break;
2563         case ASB_DISCARD_YOUNGER_PRI:
2564                 if (self == 0 && peer == 1) {
2565                         rv = -1;
2566                         break;
2567                 }
2568                 if (self == 1 && peer == 0) {
2569                         rv =  1;
2570                         break;
2571                 }
2572                 /* Else fall through to one of the other strategies... */
2573         case ASB_DISCARD_OLDER_PRI:
2574                 if (self == 0 && peer == 1) {
2575                         rv = 1;
2576                         break;
2577                 }
2578                 if (self == 1 && peer == 0) {
2579                         rv = -1;
2580                         break;
2581                 }
2582                 /* Else fall through to one of the other strategies... */
2583                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2584                      "Using discard-least-changes instead\n");
2585         case ASB_DISCARD_ZERO_CHG:
2586                 if (ch_peer == 0 && ch_self == 0) {
2587                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2588                                 ? -1 : 1;
2589                         break;
2590                 } else {
2591                         if (ch_peer == 0) { rv =  1; break; }
2592                         if (ch_self == 0) { rv = -1; break; }
2593                 }
2594                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2595                         break;
2596         case ASB_DISCARD_LEAST_CHG:
2597                 if      (ch_self < ch_peer)
2598                         rv = -1;
2599                 else if (ch_self > ch_peer)
2600                         rv =  1;
2601                 else /* ( ch_self == ch_peer ) */
2602                      /* Well, then use something else. */
2603                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2604                                 ? -1 : 1;
2605                 break;
2606         case ASB_DISCARD_LOCAL:
2607                 rv = -1;
2608                 break;
2609         case ASB_DISCARD_REMOTE:
2610                 rv =  1;
2611         }
2612
2613         return rv;
2614 }
2615
2616 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2617 {
2618         int hg, rv = -100;
2619         enum drbd_after_sb_p after_sb_1p;
2620
2621         rcu_read_lock();
2622         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2623         rcu_read_unlock();
2624         switch (after_sb_1p) {
2625         case ASB_DISCARD_YOUNGER_PRI:
2626         case ASB_DISCARD_OLDER_PRI:
2627         case ASB_DISCARD_LEAST_CHG:
2628         case ASB_DISCARD_LOCAL:
2629         case ASB_DISCARD_REMOTE:
2630         case ASB_DISCARD_ZERO_CHG:
2631                 dev_err(DEV, "Configuration error.\n");
2632                 break;
2633         case ASB_DISCONNECT:
2634                 break;
2635         case ASB_CONSENSUS:
2636                 hg = drbd_asb_recover_0p(mdev);
2637                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2638                         rv = hg;
2639                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2640                         rv = hg;
2641                 break;
2642         case ASB_VIOLENTLY:
2643                 rv = drbd_asb_recover_0p(mdev);
2644                 break;
2645         case ASB_DISCARD_SECONDARY:
2646                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2647         case ASB_CALL_HELPER:
2648                 hg = drbd_asb_recover_0p(mdev);
2649                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2650                         enum drbd_state_rv rv2;
2651
2652                         drbd_set_role(mdev, R_SECONDARY, 0);
2653                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2654                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2655                           * we do not need to wait for the after state change work either. */
2656                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2657                         if (rv2 != SS_SUCCESS) {
2658                                 drbd_khelper(mdev, "pri-lost-after-sb");
2659                         } else {
2660                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2661                                 rv = hg;
2662                         }
2663                 } else
2664                         rv = hg;
2665         }
2666
2667         return rv;
2668 }
2669
2670 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2671 {
2672         int hg, rv = -100;
2673         enum drbd_after_sb_p after_sb_2p;
2674
2675         rcu_read_lock();
2676         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2677         rcu_read_unlock();
2678         switch (after_sb_2p) {
2679         case ASB_DISCARD_YOUNGER_PRI:
2680         case ASB_DISCARD_OLDER_PRI:
2681         case ASB_DISCARD_LEAST_CHG:
2682         case ASB_DISCARD_LOCAL:
2683         case ASB_DISCARD_REMOTE:
2684         case ASB_CONSENSUS:
2685         case ASB_DISCARD_SECONDARY:
2686         case ASB_DISCARD_ZERO_CHG:
2687                 dev_err(DEV, "Configuration error.\n");
2688                 break;
2689         case ASB_VIOLENTLY:
2690                 rv = drbd_asb_recover_0p(mdev);
2691                 break;
2692         case ASB_DISCONNECT:
2693                 break;
2694         case ASB_CALL_HELPER:
2695                 hg = drbd_asb_recover_0p(mdev);
2696                 if (hg == -1) {
2697                         enum drbd_state_rv rv2;
2698
2699                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2700                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2701                           * we do not need to wait for the after state change work either. */
2702                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2703                         if (rv2 != SS_SUCCESS) {
2704                                 drbd_khelper(mdev, "pri-lost-after-sb");
2705                         } else {
2706                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2707                                 rv = hg;
2708                         }
2709                 } else
2710                         rv = hg;
2711         }
2712
2713         return rv;
2714 }
2715
2716 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2717                            u64 bits, u64 flags)
2718 {
2719         if (!uuid) {
2720                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2721                 return;
2722         }
2723         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2724              text,
2725              (unsigned long long)uuid[UI_CURRENT],
2726              (unsigned long long)uuid[UI_BITMAP],
2727              (unsigned long long)uuid[UI_HISTORY_START],
2728              (unsigned long long)uuid[UI_HISTORY_END],
2729              (unsigned long long)bits,
2730              (unsigned long long)flags);
2731 }
2732
2733 /*
2734   100   after split brain try auto recover
2735     2   C_SYNC_SOURCE set BitMap
2736     1   C_SYNC_SOURCE use BitMap
2737     0   no Sync
2738    -1   C_SYNC_TARGET use BitMap
2739    -2   C_SYNC_TARGET set BitMap
2740  -100   after split brain, disconnect
2741 -1000   unrelated data
2742 -1091   requires proto 91
2743 -1096   requires proto 96
2744  */
2745 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2746 {
2747         u64 self, peer;
2748         int i, j;
2749
2750         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2751         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2752
2753         *rule_nr = 10;
2754         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2755                 return 0;
2756
2757         *rule_nr = 20;
2758         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2759              peer != UUID_JUST_CREATED)
2760                 return -2;
2761
2762         *rule_nr = 30;
2763         if (self != UUID_JUST_CREATED &&
2764             (peer == UUID_JUST_CREATED || peer == (u64)0))
2765                 return 2;
2766
2767         if (self == peer) {
2768                 int rct, dc; /* roles at crash time */
2769
2770                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2771
2772                         if (mdev->tconn->agreed_pro_version < 91)
2773                                 return -1091;
2774
2775                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2776                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2777                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2778                                 drbd_uuid_set_bm(mdev, 0UL);
2779
2780                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2781                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2782                                 *rule_nr = 34;
2783                         } else {
2784                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2785                                 *rule_nr = 36;
2786                         }
2787
2788                         return 1;
2789                 }
2790
2791                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2792
2793                         if (mdev->tconn->agreed_pro_version < 91)
2794                                 return -1091;
2795
2796                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2797                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2798                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2799
2800                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2801                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2802                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2803
2804                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2805                                 *rule_nr = 35;
2806                         } else {
2807                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2808                                 *rule_nr = 37;
2809                         }
2810
2811                         return -1;
2812                 }
2813
2814                 /* Common power [off|failure] */
2815                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2816                         (mdev->p_uuid[UI_FLAGS] & 2);
2817                 /* lowest bit is set when we were primary,
2818                  * next bit (weight 2) is set when peer was primary */
2819                 *rule_nr = 40;
2820
2821                 switch (rct) {
2822                 case 0: /* !self_pri && !peer_pri */ return 0;
2823                 case 1: /*  self_pri && !peer_pri */ return 1;
2824                 case 2: /* !self_pri &&  peer_pri */ return -1;
2825                 case 3: /*  self_pri &&  peer_pri */
2826                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2827                         return dc ? -1 : 1;
2828                 }
2829         }
2830
2831         *rule_nr = 50;
2832         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2833         if (self == peer)
2834                 return -1;
2835
2836         *rule_nr = 51;
2837         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2838         if (self == peer) {
2839                 if (mdev->tconn->agreed_pro_version < 96 ?
2840                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2841                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2842                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2843                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2844                            resync as sync source modifications of the peer's UUIDs. */
2845
2846                         if (mdev->tconn->agreed_pro_version < 91)
2847                                 return -1091;
2848
2849                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2850                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2851
2852                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2853                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2854
2855                         return -1;
2856                 }
2857         }
2858
2859         *rule_nr = 60;
2860         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2861         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2862                 peer = mdev->p_uuid[i] & ~((u64)1);
2863                 if (self == peer)
2864                         return -2;
2865         }
2866
2867         *rule_nr = 70;
2868         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2869         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2870         if (self == peer)
2871                 return 1;
2872
2873         *rule_nr = 71;
2874         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2875         if (self == peer) {
2876                 if (mdev->tconn->agreed_pro_version < 96 ?
2877                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2878                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2879                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2880                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2881                            resync as sync source modifications of our UUIDs. */
2882
2883                         if (mdev->tconn->agreed_pro_version < 91)
2884                                 return -1091;
2885
2886                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2887                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2888
2889                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2890                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2891                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2892
2893                         return 1;
2894                 }
2895         }
2896
2897
2898         *rule_nr = 80;
2899         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2900         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2901                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2902                 if (self == peer)
2903                         return 2;
2904         }
2905
2906         *rule_nr = 90;
2907         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2908         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2909         if (self == peer && self != ((u64)0))
2910                 return 100;
2911
2912         *rule_nr = 100;
2913         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2914                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2915                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2916                         peer = mdev->p_uuid[j] & ~((u64)1);
2917                         if (self == peer)
2918                                 return -100;
2919                 }
2920         }
2921
2922         return -1000;
2923 }
2924
2925 /* drbd_sync_handshake() returns the new conn state on success, or
2926    CONN_MASK (-1) on failure.
2927  */
2928 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2929                                            enum drbd_disk_state peer_disk) __must_hold(local)
2930 {
2931         enum drbd_conns rv = C_MASK;
2932         enum drbd_disk_state mydisk;
2933         struct net_conf *nc;
2934         int hg, rule_nr, rr_conflict, tentative;
2935
2936         mydisk = mdev->state.disk;
2937         if (mydisk == D_NEGOTIATING)
2938                 mydisk = mdev->new_state_tmp.disk;
2939
2940         dev_info(DEV, "drbd_sync_handshake:\n");
2941         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2942         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2943                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2944
2945         hg = drbd_uuid_compare(mdev, &rule_nr);
2946
2947         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2948
2949         if (hg == -1000) {
2950                 dev_alert(DEV, "Unrelated data, aborting!\n");
2951                 return C_MASK;
2952         }
2953         if (hg < -1000) {
2954                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2955                 return C_MASK;
2956         }
2957
2958         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2959             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2960                 int f = (hg == -100) || abs(hg) == 2;
2961                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2962                 if (f)
2963                         hg = hg*2;
2964                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2965                      hg > 0 ? "source" : "target");
2966         }
2967
2968         if (abs(hg) == 100)
2969                 drbd_khelper(mdev, "initial-split-brain");
2970
2971         rcu_read_lock();
2972         nc = rcu_dereference(mdev->tconn->net_conf);
2973
2974         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2975                 int pcount = (mdev->state.role == R_PRIMARY)
2976                            + (peer_role == R_PRIMARY);
2977                 int forced = (hg == -100);
2978
2979                 switch (pcount) {
2980                 case 0:
2981                         hg = drbd_asb_recover_0p(mdev);
2982                         break;
2983                 case 1:
2984                         hg = drbd_asb_recover_1p(mdev);
2985                         break;
2986                 case 2:
2987                         hg = drbd_asb_recover_2p(mdev);
2988                         break;
2989                 }
2990                 if (abs(hg) < 100) {
2991                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2992                              "automatically solved. Sync from %s node\n",
2993                              pcount, (hg < 0) ? "peer" : "this");
2994                         if (forced) {
2995                                 dev_warn(DEV, "Doing a full sync, since"
2996                                      " UUIDs where ambiguous.\n");
2997                                 hg = hg*2;
2998                         }
2999                 }
3000         }
3001
3002         if (hg == -100) {
3003                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3004                         hg = -1;
3005                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3006                         hg = 1;
3007
3008                 if (abs(hg) < 100)
3009                         dev_warn(DEV, "Split-Brain detected, manually solved. "
3010                              "Sync from %s node\n",
3011                              (hg < 0) ? "peer" : "this");
3012         }
3013         rr_conflict = nc->rr_conflict;
3014         tentative = nc->tentative;
3015         rcu_read_unlock();
3016
3017         if (hg == -100) {
3018                 /* FIXME this log message is not correct if we end up here
3019                  * after an attempted attach on a diskless node.
3020                  * We just refuse to attach -- well, we drop the "connection"
3021                  * to that disk, in a way... */
3022                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3023                 drbd_khelper(mdev, "split-brain");
3024                 return C_MASK;
3025         }
3026
3027         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3028                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3029                 return C_MASK;
3030         }
3031
3032         if (hg < 0 && /* by intention we do not use mydisk here. */
3033             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3034                 switch (rr_conflict) {
3035                 case ASB_CALL_HELPER:
3036                         drbd_khelper(mdev, "pri-lost");
3037                         /* fall through */
3038                 case ASB_DISCONNECT:
3039                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3040                         return C_MASK;
3041                 case ASB_VIOLENTLY:
3042                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3043                              "assumption\n");
3044                 }
3045         }
3046
3047         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3048                 if (hg == 0)
3049                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3050                 else
3051                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3052                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3053                                  abs(hg) >= 2 ? "full" : "bit-map based");
3054                 return C_MASK;
3055         }
3056
3057         if (abs(hg) >= 2) {
3058                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3059                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3060                                         BM_LOCKED_SET_ALLOWED))
3061                         return C_MASK;
3062         }
3063
3064         if (hg > 0) { /* become sync source. */
3065                 rv = C_WF_BITMAP_S;
3066         } else if (hg < 0) { /* become sync target */
3067                 rv = C_WF_BITMAP_T;
3068         } else {
3069                 rv = C_CONNECTED;
3070                 if (drbd_bm_total_weight(mdev)) {
3071                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3072                              drbd_bm_total_weight(mdev));
3073                 }
3074         }
3075
3076         return rv;
3077 }
3078
3079 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3080 {
3081         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3082         if (peer == ASB_DISCARD_REMOTE)
3083                 return ASB_DISCARD_LOCAL;
3084
3085         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3086         if (peer == ASB_DISCARD_LOCAL)
3087                 return ASB_DISCARD_REMOTE;
3088
3089         /* everything else is valid if they are equal on both sides. */
3090         return peer;
3091 }
3092
3093 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3094 {
3095         struct p_protocol *p = pi->data;
3096         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3097         int p_proto, p_discard_my_data, p_two_primaries, cf;
3098         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3099         char integrity_alg[SHARED_SECRET_MAX] = "";
3100         struct crypto_hash *peer_integrity_tfm = NULL;
3101         void *int_dig_in = NULL, *int_dig_vv = NULL;
3102
3103         p_proto         = be32_to_cpu(p->protocol);
3104         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3105         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3106         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3107         p_two_primaries = be32_to_cpu(p->two_primaries);
3108         cf              = be32_to_cpu(p->conn_flags);
3109         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3110
3111         if (tconn->agreed_pro_version >= 87) {
3112                 int err;
3113
3114                 if (pi->size > sizeof(integrity_alg))
3115                         return -EIO;
3116                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3117                 if (err)
3118                         return err;
3119                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3120         }
3121
3122         if (pi->cmd != P_PROTOCOL_UPDATE) {
3123                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3124
3125                 if (cf & CF_DRY_RUN)
3126                         set_bit(CONN_DRY_RUN, &tconn->flags);
3127
3128                 rcu_read_lock();
3129                 nc = rcu_dereference(tconn->net_conf);
3130
3131                 if (p_proto != nc->wire_protocol) {
3132                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3133                         goto disconnect_rcu_unlock;
3134                 }
3135
3136                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3137                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3138                         goto disconnect_rcu_unlock;
3139                 }
3140
3141                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3142                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3143                         goto disconnect_rcu_unlock;
3144                 }
3145
3146                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3147                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3148                         goto disconnect_rcu_unlock;
3149                 }
3150
3151                 if (p_discard_my_data && nc->discard_my_data) {
3152                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3153                         goto disconnect_rcu_unlock;
3154                 }
3155
3156                 if (p_two_primaries != nc->two_primaries) {
3157                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3158                         goto disconnect_rcu_unlock;
3159                 }
3160
3161                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3162                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3163                         goto disconnect_rcu_unlock;
3164                 }
3165
3166                 rcu_read_unlock();
3167         }
3168
3169         if (integrity_alg[0]) {
3170                 int hash_size;
3171
3172                 /*
3173                  * We can only change the peer data integrity algorithm
3174                  * here.  Changing our own data integrity algorithm
3175                  * requires that we send a P_PROTOCOL_UPDATE packet at
3176                  * the same time; otherwise, the peer has no way to
3177                  * tell between which packets the algorithm should
3178                  * change.
3179                  */
3180
3181                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3182                 if (!peer_integrity_tfm) {
3183                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3184                                  integrity_alg);
3185                         goto disconnect;
3186                 }
3187
3188                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3189                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3190                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3191                 if (!(int_dig_in && int_dig_vv)) {
3192                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3193                         goto disconnect;
3194                 }
3195         }
3196
3197         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3198         if (!new_net_conf) {
3199                 conn_err(tconn, "Allocation of new net_conf failed\n");
3200                 goto disconnect;
3201         }
3202
3203         mutex_lock(&tconn->data.mutex);
3204         mutex_lock(&tconn->conf_update);
3205         old_net_conf = tconn->net_conf;
3206         *new_net_conf = *old_net_conf;
3207
3208         new_net_conf->wire_protocol = p_proto;
3209         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3210         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3211         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3212         new_net_conf->two_primaries = p_two_primaries;
3213
3214         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3215         mutex_unlock(&tconn->conf_update);
3216         mutex_unlock(&tconn->data.mutex);
3217
3218         crypto_free_hash(tconn->peer_integrity_tfm);
3219         kfree(tconn->int_dig_in);
3220         kfree(tconn->int_dig_vv);
3221         tconn->peer_integrity_tfm = peer_integrity_tfm;
3222         tconn->int_dig_in = int_dig_in;
3223         tconn->int_dig_vv = int_dig_vv;
3224
3225         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3226                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3227                           integrity_alg[0] ? integrity_alg : "(none)");
3228
3229         synchronize_rcu();
3230         kfree(old_net_conf);
3231         return 0;
3232
3233 disconnect_rcu_unlock:
3234         rcu_read_unlock();
3235 disconnect:
3236         crypto_free_hash(peer_integrity_tfm);
3237         kfree(int_dig_in);
3238         kfree(int_dig_vv);
3239         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3240         return -EIO;
3241 }
3242
3243 /* helper function
3244  * input: alg name, feature name
3245  * return: NULL (alg name was "")
3246  *         ERR_PTR(error) if something goes wrong
3247  *         or the crypto hash ptr, if it worked out ok. */
3248 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3249                 const char *alg, const char *name)
3250 {
3251         struct crypto_hash *tfm;
3252
3253         if (!alg[0])
3254                 return NULL;
3255
3256         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3257         if (IS_ERR(tfm)) {
3258                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3259                         alg, name, PTR_ERR(tfm));
3260                 return tfm;
3261         }
3262         return tfm;
3263 }
3264
3265 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3266 {
3267         void *buffer = tconn->data.rbuf;
3268         int size = pi->size;
3269
3270         while (size) {
3271                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3272                 s = drbd_recv(tconn, buffer, s);
3273                 if (s <= 0) {
3274                         if (s < 0)
3275                                 return s;
3276                         break;
3277                 }
3278                 size -= s;
3279         }
3280         if (size)
3281                 return -EIO;
3282         return 0;
3283 }
3284
3285 /*
3286  * config_unknown_volume  -  device configuration command for unknown volume
3287  *
3288  * When a device is added to an existing connection, the node on which the
3289  * device is added first will send configuration commands to its peer but the
3290  * peer will not know about the device yet.  It will warn and ignore these
3291  * commands.  Once the device is added on the second node, the second node will
3292  * send the same device configuration commands, but in the other direction.
3293  *
3294  * (We can also end up here if drbd is misconfigured.)
3295  */
3296 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3297 {
3298         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3299                   cmdname(pi->cmd), pi->vnr);
3300         return ignore_remaining_packet(tconn, pi);
3301 }
3302
3303 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3304 {
3305         struct drbd_conf *mdev;
3306         struct p_rs_param_95 *p;
3307         unsigned int header_size, data_size, exp_max_sz;
3308         struct crypto_hash *verify_tfm = NULL;
3309         struct crypto_hash *csums_tfm = NULL;
3310         struct net_conf *old_net_conf, *new_net_conf = NULL;
3311         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3312         const int apv = tconn->agreed_pro_version;
3313         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3314         int fifo_size = 0;
3315         int err;
3316
3317         mdev = vnr_to_mdev(tconn, pi->vnr);
3318         if (!mdev)
3319                 return config_unknown_volume(tconn, pi);
3320
3321         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3322                     : apv == 88 ? sizeof(struct p_rs_param)
3323                                         + SHARED_SECRET_MAX
3324                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3325                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3326
3327         if (pi->size > exp_max_sz) {
3328                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3329                     pi->size, exp_max_sz);
3330                 return -EIO;
3331         }
3332
3333         if (apv <= 88) {
3334                 header_size = sizeof(struct p_rs_param);
3335                 data_size = pi->size - header_size;
3336         } else if (apv <= 94) {
3337                 header_size = sizeof(struct p_rs_param_89);
3338                 data_size = pi->size - header_size;
3339                 D_ASSERT(data_size == 0);
3340         } else {
3341                 header_size = sizeof(struct p_rs_param_95);
3342                 data_size = pi->size - header_size;
3343                 D_ASSERT(data_size == 0);
3344         }
3345
3346         /* initialize verify_alg and csums_alg */
3347         p = pi->data;
3348         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3349
3350         err = drbd_recv_all(mdev->tconn, p, header_size);
3351         if (err)
3352                 return err;
3353
3354         mutex_lock(&mdev->tconn->conf_update);
3355         old_net_conf = mdev->tconn->net_conf;
3356         if (get_ldev(mdev)) {
3357                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3358                 if (!new_disk_conf) {
3359                         put_ldev(mdev);
3360                         mutex_unlock(&mdev->tconn->conf_update);
3361                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3362                         return -ENOMEM;
3363                 }
3364
3365                 old_disk_conf = mdev->ldev->disk_conf;
3366                 *new_disk_conf = *old_disk_conf;
3367
3368                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3369         }
3370
3371         if (apv >= 88) {
3372                 if (apv == 88) {
3373                         if (data_size > SHARED_SECRET_MAX) {
3374                                 dev_err(DEV, "verify-alg too long, "
3375                                     "peer wants %u, accepting only %u byte\n",
3376                                                 data_size, SHARED_SECRET_MAX);
3377                                 err = -EIO;
3378                                 goto reconnect;
3379                         }
3380
3381                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3382                         if (err)
3383                                 goto reconnect;
3384                         /* we expect NUL terminated string */
3385                         /* but just in case someone tries to be evil */
3386                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3387                         p->verify_alg[data_size-1] = 0;
3388
3389                 } else /* apv >= 89 */ {
3390                         /* we still expect NUL terminated strings */
3391                         /* but just in case someone tries to be evil */
3392                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3393                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3394                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3395                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3396                 }
3397
3398                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3399                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3400                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3401                                     old_net_conf->verify_alg, p->verify_alg);
3402                                 goto disconnect;
3403                         }
3404                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3405                                         p->verify_alg, "verify-alg");
3406                         if (IS_ERR(verify_tfm)) {
3407                                 verify_tfm = NULL;
3408                                 goto disconnect;
3409                         }
3410                 }
3411
3412                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3413                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3414                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3415                                     old_net_conf->csums_alg, p->csums_alg);
3416                                 goto disconnect;
3417                         }
3418                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3419                                         p->csums_alg, "csums-alg");
3420                         if (IS_ERR(csums_tfm)) {
3421                                 csums_tfm = NULL;
3422                                 goto disconnect;
3423                         }
3424                 }
3425
3426                 if (apv > 94 && new_disk_conf) {
3427                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3428                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3429                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3430                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3431
3432                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3433                         if (fifo_size != mdev->rs_plan_s->size) {
3434                                 new_plan = fifo_alloc(fifo_size);
3435                                 if (!new_plan) {
3436                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3437                                         put_ldev(mdev);
3438                                         goto disconnect;
3439                                 }
3440                         }
3441                 }
3442
3443                 if (verify_tfm || csums_tfm) {
3444                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3445                         if (!new_net_conf) {
3446                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3447                                 goto disconnect;
3448                         }
3449
3450                         *new_net_conf = *old_net_conf;
3451
3452                         if (verify_tfm) {
3453                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3454                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3455                                 crypto_free_hash(mdev->tconn->verify_tfm);
3456                                 mdev->tconn->verify_tfm = verify_tfm;
3457                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3458                         }
3459                         if (csums_tfm) {
3460                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3461                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3462                                 crypto_free_hash(mdev->tconn->csums_tfm);
3463                                 mdev->tconn->csums_tfm = csums_tfm;
3464                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3465                         }
3466                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3467                 }
3468         }
3469
3470         if (new_disk_conf) {
3471                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3472                 put_ldev(mdev);
3473         }
3474
3475         if (new_plan) {
3476                 old_plan = mdev->rs_plan_s;
3477                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3478         }
3479
3480         mutex_unlock(&mdev->tconn->conf_update);
3481         synchronize_rcu();
3482         if (new_net_conf)
3483                 kfree(old_net_conf);
3484         kfree(old_disk_conf);
3485         kfree(old_plan);
3486
3487         return 0;
3488
3489 reconnect:
3490         if (new_disk_conf) {
3491                 put_ldev(mdev);
3492                 kfree(new_disk_conf);
3493         }
3494         mutex_unlock(&mdev->tconn->conf_update);
3495         return -EIO;
3496
3497 disconnect:
3498         kfree(new_plan);
3499         if (new_disk_conf) {
3500                 put_ldev(mdev);
3501                 kfree(new_disk_conf);
3502         }
3503         mutex_unlock(&mdev->tconn->conf_update);
3504         /* just for completeness: actually not needed,
3505          * as this is not reached if csums_tfm was ok. */
3506         crypto_free_hash(csums_tfm);
3507         /* but free the verify_tfm again, if csums_tfm did not work out */
3508         crypto_free_hash(verify_tfm);
3509         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3510         return -EIO;
3511 }
3512
3513 /* warn if the arguments differ by more than 12.5% */
3514 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3515         const char *s, sector_t a, sector_t b)
3516 {
3517         sector_t d;
3518         if (a == 0 || b == 0)
3519                 return;
3520         d = (a > b) ? (a - b) : (b - a);
3521         if (d > (a>>3) || d > (b>>3))
3522                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3523                      (unsigned long long)a, (unsigned long long)b);
3524 }
3525
3526 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3527 {
3528         struct drbd_conf *mdev;
3529         struct p_sizes *p = pi->data;
3530         enum determine_dev_size dd = unchanged;
3531         sector_t p_size, p_usize, my_usize;
3532         int ldsc = 0; /* local disk size changed */
3533         enum dds_flags ddsf;
3534
3535         mdev = vnr_to_mdev(tconn, pi->vnr);
3536         if (!mdev)
3537                 return config_unknown_volume(tconn, pi);
3538
3539         p_size = be64_to_cpu(p->d_size);
3540         p_usize = be64_to_cpu(p->u_size);
3541
3542         /* just store the peer's disk size for now.
3543          * we still need to figure out whether we accept that. */
3544         mdev->p_size = p_size;
3545
3546         if (get_ldev(mdev)) {
3547                 rcu_read_lock();
3548                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3549                 rcu_read_unlock();
3550
3551                 warn_if_differ_considerably(mdev, "lower level device sizes",
3552                            p_size, drbd_get_max_capacity(mdev->ldev));
3553                 warn_if_differ_considerably(mdev, "user requested size",
3554                                             p_usize, my_usize);
3555
3556                 /* if this is the first connect, or an otherwise expected
3557                  * param exchange, choose the minimum */
3558                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3559                         p_usize = min_not_zero(my_usize, p_usize);
3560
3561                 /* Never shrink a device with usable data during connect.
3562                    But allow online shrinking if we are connected. */
3563                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3564                     drbd_get_capacity(mdev->this_bdev) &&
3565                     mdev->state.disk >= D_OUTDATED &&
3566                     mdev->state.conn < C_CONNECTED) {
3567                         dev_err(DEV, "The peer's disk size is too small!\n");
3568                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3569                         put_ldev(mdev);
3570                         return -EIO;
3571                 }
3572
3573                 if (my_usize != p_usize) {
3574                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3575
3576                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3577                         if (!new_disk_conf) {
3578                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3579                                 put_ldev(mdev);
3580                                 return -ENOMEM;
3581                         }
3582
3583                         mutex_lock(&mdev->tconn->conf_update);
3584                         old_disk_conf = mdev->ldev->disk_conf;
3585                         *new_disk_conf = *old_disk_conf;
3586                         new_disk_conf->disk_size = p_usize;
3587
3588                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3589                         mutex_unlock(&mdev->tconn->conf_update);
3590                         synchronize_rcu();
3591                         kfree(old_disk_conf);
3592
3593                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3594                                  (unsigned long)my_usize);
3595                 }
3596
3597                 put_ldev(mdev);
3598         }
3599
3600         ddsf = be16_to_cpu(p->dds_flags);
3601         if (get_ldev(mdev)) {
3602                 dd = drbd_determine_dev_size(mdev, ddsf);
3603                 put_ldev(mdev);
3604                 if (dd == dev_size_error)
3605                         return -EIO;
3606                 drbd_md_sync(mdev);
3607         } else {
3608                 /* I am diskless, need to accept the peer's size. */
3609                 drbd_set_my_capacity(mdev, p_size);
3610         }
3611
3612         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3613         drbd_reconsider_max_bio_size(mdev);
3614
3615         if (get_ldev(mdev)) {
3616                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3617                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3618                         ldsc = 1;
3619                 }
3620
3621                 put_ldev(mdev);
3622         }
3623
3624         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3625                 if (be64_to_cpu(p->c_size) !=
3626                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3627                         /* we have different sizes, probably peer
3628                          * needs to know my new size... */
3629                         drbd_send_sizes(mdev, 0, ddsf);
3630                 }
3631                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3632                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3633                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3634                             mdev->state.disk >= D_INCONSISTENT) {
3635                                 if (ddsf & DDSF_NO_RESYNC)
3636                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3637                                 else
3638                                         resync_after_online_grow(mdev);
3639                         } else
3640                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3641                 }
3642         }
3643
3644         return 0;
3645 }
3646
3647 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3648 {
3649         struct drbd_conf *mdev;
3650         struct p_uuids *p = pi->data;
3651         u64 *p_uuid;
3652         int i, updated_uuids = 0;
3653
3654         mdev = vnr_to_mdev(tconn, pi->vnr);
3655         if (!mdev)
3656                 return config_unknown_volume(tconn, pi);
3657
3658         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3659
3660         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3661                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3662
3663         kfree(mdev->p_uuid);
3664         mdev->p_uuid = p_uuid;
3665
3666         if (mdev->state.conn < C_CONNECTED &&
3667             mdev->state.disk < D_INCONSISTENT &&
3668             mdev->state.role == R_PRIMARY &&
3669             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3670                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3671                     (unsigned long long)mdev->ed_uuid);
3672                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3673                 return -EIO;
3674         }
3675
3676         if (get_ldev(mdev)) {
3677                 int skip_initial_sync =
3678                         mdev->state.conn == C_CONNECTED &&
3679                         mdev->tconn->agreed_pro_version >= 90 &&
3680                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3681                         (p_uuid[UI_FLAGS] & 8);
3682                 if (skip_initial_sync) {
3683                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3684                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3685                                         "clear_n_write from receive_uuids",
3686                                         BM_LOCKED_TEST_ALLOWED);
3687                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3688                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3689                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3690                                         CS_VERBOSE, NULL);
3691                         drbd_md_sync(mdev);
3692                         updated_uuids = 1;
3693                 }
3694                 put_ldev(mdev);
3695         } else if (mdev->state.disk < D_INCONSISTENT &&
3696                    mdev->state.role == R_PRIMARY) {
3697                 /* I am a diskless primary, the peer just created a new current UUID
3698                    for me. */
3699                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3700         }
3701
3702         /* Before we test for the disk state, we should wait until an eventually
3703            ongoing cluster wide state change is finished. That is important if
3704            we are primary and are detaching from our disk. We need to see the
3705            new disk state... */
3706         mutex_lock(mdev->state_mutex);
3707         mutex_unlock(mdev->state_mutex);
3708         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3709                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3710
3711         if (updated_uuids)
3712                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3713
3714         return 0;
3715 }
3716
3717 /**
3718  * convert_state() - Converts the peer's view of the cluster state to our point of view
3719  * @ps:         The state as seen by the peer.
3720  */
3721 static union drbd_state convert_state(union drbd_state ps)
3722 {
3723         union drbd_state ms;
3724
3725         static enum drbd_conns c_tab[] = {
3726                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3727                 [C_CONNECTED] = C_CONNECTED,
3728
3729                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3730                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3731                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3732                 [C_VERIFY_S]       = C_VERIFY_T,
3733                 [C_MASK]   = C_MASK,
3734         };
3735
3736         ms.i = ps.i;
3737
3738         ms.conn = c_tab[ps.conn];
3739         ms.peer = ps.role;
3740         ms.role = ps.peer;
3741         ms.pdsk = ps.disk;
3742         ms.disk = ps.pdsk;
3743         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3744
3745         return ms;
3746 }
3747
3748 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3749 {
3750         struct drbd_conf *mdev;
3751         struct p_req_state *p = pi->data;
3752         union drbd_state mask, val;
3753         enum drbd_state_rv rv;
3754
3755         mdev = vnr_to_mdev(tconn, pi->vnr);
3756         if (!mdev)
3757                 return -EIO;
3758
3759         mask.i = be32_to_cpu(p->mask);
3760         val.i = be32_to_cpu(p->val);
3761
3762         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3763             mutex_is_locked(mdev->state_mutex)) {
3764                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3765                 return 0;
3766         }
3767
3768         mask = convert_state(mask);
3769         val = convert_state(val);
3770
3771         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3772         drbd_send_sr_reply(mdev, rv);
3773
3774         drbd_md_sync(mdev);
3775
3776         return 0;
3777 }
3778
3779 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3780 {
3781         struct p_req_state *p = pi->data;
3782         union drbd_state mask, val;
3783         enum drbd_state_rv rv;
3784
3785         mask.i = be32_to_cpu(p->mask);
3786         val.i = be32_to_cpu(p->val);
3787
3788         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3789             mutex_is_locked(&tconn->cstate_mutex)) {
3790                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3791                 return 0;
3792         }
3793
3794         mask = convert_state(mask);
3795         val = convert_state(val);
3796
3797         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3798         conn_send_sr_reply(tconn, rv);
3799
3800         return 0;
3801 }
3802
3803 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3804 {
3805         struct drbd_conf *mdev;
3806         struct p_state *p = pi->data;
3807         union drbd_state os, ns, peer_state;
3808         enum drbd_disk_state real_peer_disk;
3809         enum chg_state_flags cs_flags;
3810         int rv;
3811
3812         mdev = vnr_to_mdev(tconn, pi->vnr);
3813         if (!mdev)
3814                 return config_unknown_volume(tconn, pi);
3815
3816         peer_state.i = be32_to_cpu(p->state);
3817
3818         real_peer_disk = peer_state.disk;
3819         if (peer_state.disk == D_NEGOTIATING) {
3820                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3821                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3822         }
3823
3824         spin_lock_irq(&mdev->tconn->req_lock);
3825  retry:
3826         os = ns = drbd_read_state(mdev);
3827         spin_unlock_irq(&mdev->tconn->req_lock);
3828
3829         /* If some other part of the code (asender thread, timeout)
3830          * already decided to close the connection again,
3831          * we must not "re-establish" it here. */
3832         if (os.conn <= C_TEAR_DOWN)
3833                 return false;
3834
3835         /* If this is the "end of sync" confirmation, usually the peer disk
3836          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3837          * set) resync started in PausedSyncT, or if the timing of pause-/
3838          * unpause-sync events has been "just right", the peer disk may
3839          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3840          */
3841         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3842             real_peer_disk == D_UP_TO_DATE &&
3843             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3844                 /* If we are (becoming) SyncSource, but peer is still in sync
3845                  * preparation, ignore its uptodate-ness to avoid flapping, it
3846                  * will change to inconsistent once the peer reaches active
3847                  * syncing states.
3848                  * It may have changed syncer-paused flags, however, so we
3849                  * cannot ignore this completely. */
3850                 if (peer_state.conn > C_CONNECTED &&
3851                     peer_state.conn < C_SYNC_SOURCE)
3852                         real_peer_disk = D_INCONSISTENT;
3853
3854                 /* if peer_state changes to connected at the same time,
3855                  * it explicitly notifies us that it finished resync.
3856                  * Maybe we should finish it up, too? */
3857                 else if (os.conn >= C_SYNC_SOURCE &&
3858                          peer_state.conn == C_CONNECTED) {
3859                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3860                                 drbd_resync_finished(mdev);
3861                         return 0;
3862                 }
3863         }
3864
3865         /* peer says his disk is inconsistent, while we think it is uptodate,
3866          * and this happens while the peer still thinks we have a sync going on,
3867          * but we think we are already done with the sync.
3868          * We ignore this to avoid flapping pdsk.
3869          * This should not happen, if the peer is a recent version of drbd. */
3870         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3871             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3872                 real_peer_disk = D_UP_TO_DATE;
3873
3874         if (ns.conn == C_WF_REPORT_PARAMS)
3875                 ns.conn = C_CONNECTED;
3876
3877         if (peer_state.conn == C_AHEAD)
3878                 ns.conn = C_BEHIND;
3879
3880         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3881             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3882                 int cr; /* consider resync */
3883
3884                 /* if we established a new connection */
3885                 cr  = (os.conn < C_CONNECTED);
3886                 /* if we had an established connection
3887                  * and one of the nodes newly attaches a disk */
3888                 cr |= (os.conn == C_CONNECTED &&
3889                        (peer_state.disk == D_NEGOTIATING ||
3890                         os.disk == D_NEGOTIATING));
3891                 /* if we have both been inconsistent, and the peer has been
3892                  * forced to be UpToDate with --overwrite-data */
3893                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3894                 /* if we had been plain connected, and the admin requested to
3895                  * start a sync by "invalidate" or "invalidate-remote" */
3896                 cr |= (os.conn == C_CONNECTED &&
3897                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3898                                  peer_state.conn <= C_WF_BITMAP_T));
3899
3900                 if (cr)
3901                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3902
3903                 put_ldev(mdev);
3904                 if (ns.conn == C_MASK) {
3905                         ns.conn = C_CONNECTED;
3906                         if (mdev->state.disk == D_NEGOTIATING) {
3907                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3908                         } else if (peer_state.disk == D_NEGOTIATING) {
3909                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3910                                 peer_state.disk = D_DISKLESS;
3911                                 real_peer_disk = D_DISKLESS;
3912                         } else {
3913                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3914                                         return -EIO;
3915                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3916                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3917                                 return -EIO;
3918                         }
3919                 }
3920         }
3921
3922         spin_lock_irq(&mdev->tconn->req_lock);
3923         if (os.i != drbd_read_state(mdev).i)
3924                 goto retry;
3925         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3926         ns.peer = peer_state.role;
3927         ns.pdsk = real_peer_disk;
3928         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3929         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3930                 ns.disk = mdev->new_state_tmp.disk;
3931         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3932         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3933             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3934                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3935                    for temporal network outages! */
3936                 spin_unlock_irq(&mdev->tconn->req_lock);
3937                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3938                 tl_clear(mdev->tconn);
3939                 drbd_uuid_new_current(mdev);
3940                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3941                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3942                 return -EIO;
3943         }
3944         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3945         ns = drbd_read_state(mdev);
3946         spin_unlock_irq(&mdev->tconn->req_lock);
3947
3948         if (rv < SS_SUCCESS) {
3949                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3950                 return -EIO;
3951         }
3952
3953         if (os.conn > C_WF_REPORT_PARAMS) {
3954                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3955                     peer_state.disk != D_NEGOTIATING ) {
3956                         /* we want resync, peer has not yet decided to sync... */
3957                         /* Nowadays only used when forcing a node into primary role and
3958                            setting its disk to UpToDate with that */
3959                         drbd_send_uuids(mdev);
3960                         drbd_send_current_state(mdev);
3961                 }
3962         }
3963
3964         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3965
3966         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3967
3968         return 0;
3969 }
3970
3971 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3972 {
3973         struct drbd_conf *mdev;
3974         struct p_rs_uuid *p = pi->data;
3975
3976         mdev = vnr_to_mdev(tconn, pi->vnr);
3977         if (!mdev)
3978                 return -EIO;
3979
3980         wait_event(mdev->misc_wait,
3981                    mdev->state.conn == C_WF_SYNC_UUID ||
3982                    mdev->state.conn == C_BEHIND ||
3983                    mdev->state.conn < C_CONNECTED ||
3984                    mdev->state.disk < D_NEGOTIATING);
3985
3986         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3987
3988         /* Here the _drbd_uuid_ functions are right, current should
3989            _not_ be rotated into the history */
3990         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3991                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3992                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3993
3994                 drbd_print_uuids(mdev, "updated sync uuid");
3995                 drbd_start_resync(mdev, C_SYNC_TARGET);
3996
3997                 put_ldev(mdev);
3998         } else
3999                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
4000
4001         return 0;
4002 }
4003
4004 /**
4005  * receive_bitmap_plain
4006  *
4007  * Return 0 when done, 1 when another iteration is needed, and a negative error
4008  * code upon failure.
4009  */
4010 static int
4011 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4012                      unsigned long *p, struct bm_xfer_ctx *c)
4013 {
4014         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4015                                  drbd_header_size(mdev->tconn);
4016         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4017                                        c->bm_words - c->word_offset);
4018         unsigned int want = num_words * sizeof(*p);
4019         int err;
4020
4021         if (want != size) {
4022                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4023                 return -EIO;
4024         }
4025         if (want == 0)
4026                 return 0;
4027         err = drbd_recv_all(mdev->tconn, p, want);
4028         if (err)
4029                 return err;
4030
4031         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4032
4033         c->word_offset += num_words;
4034         c->bit_offset = c->word_offset * BITS_PER_LONG;
4035         if (c->bit_offset > c->bm_bits)
4036                 c->bit_offset = c->bm_bits;
4037
4038         return 1;
4039 }
4040
4041 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4042 {
4043         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4044 }
4045
4046 static int dcbp_get_start(struct p_compressed_bm *p)
4047 {
4048         return (p->encoding & 0x80) != 0;
4049 }
4050
4051 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4052 {
4053         return (p->encoding >> 4) & 0x7;
4054 }
4055
4056 /**
4057  * recv_bm_rle_bits
4058  *
4059  * Return 0 when done, 1 when another iteration is needed, and a negative error
4060  * code upon failure.
4061  */
4062 static int
4063 recv_bm_rle_bits(struct drbd_conf *mdev,
4064                 struct p_compressed_bm *p,
4065                  struct bm_xfer_ctx *c,
4066                  unsigned int len)
4067 {
4068         struct bitstream bs;
4069         u64 look_ahead;
4070         u64 rl;
4071         u64 tmp;
4072         unsigned long s = c->bit_offset;
4073         unsigned long e;
4074         int toggle = dcbp_get_start(p);
4075         int have;
4076         int bits;
4077
4078         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4079
4080         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4081         if (bits < 0)
4082                 return -EIO;
4083
4084         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4085                 bits = vli_decode_bits(&rl, look_ahead);
4086                 if (bits <= 0)
4087                         return -EIO;
4088
4089                 if (toggle) {
4090                         e = s + rl -1;
4091                         if (e >= c->bm_bits) {
4092                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4093                                 return -EIO;
4094                         }
4095                         _drbd_bm_set_bits(mdev, s, e);
4096                 }
4097
4098                 if (have < bits) {
4099                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4100                                 have, bits, look_ahead,
4101                                 (unsigned int)(bs.cur.b - p->code),
4102                                 (unsigned int)bs.buf_len);
4103                         return -EIO;
4104                 }
4105                 look_ahead >>= bits;
4106                 have -= bits;
4107
4108                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4109                 if (bits < 0)
4110                         return -EIO;
4111                 look_ahead |= tmp << have;
4112                 have += bits;
4113         }
4114
4115         c->bit_offset = s;
4116         bm_xfer_ctx_bit_to_word_offset(c);
4117
4118         return (s != c->bm_bits);
4119 }
4120
4121 /**
4122  * decode_bitmap_c
4123  *
4124  * Return 0 when done, 1 when another iteration is needed, and a negative error
4125  * code upon failure.
4126  */
4127 static int
4128 decode_bitmap_c(struct drbd_conf *mdev,
4129                 struct p_compressed_bm *p,
4130                 struct bm_xfer_ctx *c,
4131                 unsigned int len)
4132 {
4133         if (dcbp_get_code(p) == RLE_VLI_Bits)
4134                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4135
4136         /* other variants had been implemented for evaluation,
4137          * but have been dropped as this one turned out to be "best"
4138          * during all our tests. */
4139
4140         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4141         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4142         return -EIO;
4143 }
4144
4145 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4146                 const char *direction, struct bm_xfer_ctx *c)
4147 {
4148         /* what would it take to transfer it "plaintext" */
4149         unsigned int header_size = drbd_header_size(mdev->tconn);
4150         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4151         unsigned int plain =
4152                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4153                 c->bm_words * sizeof(unsigned long);
4154         unsigned int total = c->bytes[0] + c->bytes[1];
4155         unsigned int r;
4156
4157         /* total can not be zero. but just in case: */
4158         if (total == 0)
4159                 return;
4160
4161         /* don't report if not compressed */
4162         if (total >= plain)
4163                 return;
4164
4165         /* total < plain. check for overflow, still */
4166         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4167                                     : (1000 * total / plain);
4168
4169         if (r > 1000)
4170                 r = 1000;
4171
4172         r = 1000 - r;
4173         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4174              "total %u; compression: %u.%u%%\n",
4175                         direction,
4176                         c->bytes[1], c->packets[1],
4177                         c->bytes[0], c->packets[0],
4178                         total, r/10, r % 10);
4179 }
4180
4181 /* Since we are processing the bitfield from lower addresses to higher,
4182    it does not matter if the process it in 32 bit chunks or 64 bit
4183    chunks as long as it is little endian. (Understand it as byte stream,
4184    beginning with the lowest byte...) If we would use big endian
4185    we would need to process it from the highest address to the lowest,
4186    in order to be agnostic to the 32 vs 64 bits issue.
4187
4188    returns 0 on failure, 1 if we successfully received it. */
4189 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4190 {
4191         struct drbd_conf *mdev;
4192         struct bm_xfer_ctx c;
4193         int err;
4194
4195         mdev = vnr_to_mdev(tconn, pi->vnr);
4196         if (!mdev)
4197                 return -EIO;
4198
4199         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4200         /* you are supposed to send additional out-of-sync information
4201          * if you actually set bits during this phase */
4202
4203         c = (struct bm_xfer_ctx) {
4204                 .bm_bits = drbd_bm_bits(mdev),
4205                 .bm_words = drbd_bm_words(mdev),
4206         };
4207
4208         for(;;) {
4209                 if (pi->cmd == P_BITMAP)
4210                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4211                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4212                         /* MAYBE: sanity check that we speak proto >= 90,
4213                          * and the feature is enabled! */
4214                         struct p_compressed_bm *p = pi->data;
4215
4216                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4217                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4218                                 err = -EIO;
4219                                 goto out;
4220                         }
4221                         if (pi->size <= sizeof(*p)) {
4222                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4223                                 err = -EIO;
4224                                 goto out;
4225                         }
4226                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4227                         if (err)
4228                                goto out;
4229                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4230                 } else {
4231                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4232                         err = -EIO;
4233                         goto out;
4234                 }
4235
4236                 c.packets[pi->cmd == P_BITMAP]++;
4237                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4238
4239                 if (err <= 0) {
4240                         if (err < 0)
4241                                 goto out;
4242                         break;
4243                 }
4244                 err = drbd_recv_header(mdev->tconn, pi);
4245                 if (err)
4246                         goto out;
4247         }
4248
4249         INFO_bm_xfer_stats(mdev, "receive", &c);
4250
4251         if (mdev->state.conn == C_WF_BITMAP_T) {
4252                 enum drbd_state_rv rv;
4253
4254                 err = drbd_send_bitmap(mdev);
4255                 if (err)
4256                         goto out;
4257                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4258                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4259                 D_ASSERT(rv == SS_SUCCESS);
4260         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4261                 /* admin may have requested C_DISCONNECTING,
4262                  * other threads may have noticed network errors */
4263                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4264                     drbd_conn_str(mdev->state.conn));
4265         }
4266         err = 0;
4267
4268  out:
4269         drbd_bm_unlock(mdev);
4270         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4271                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4272         return err;
4273 }
4274
4275 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4276 {
4277         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4278                  pi->cmd, pi->size);
4279
4280         return ignore_remaining_packet(tconn, pi);
4281 }
4282
4283 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4284 {
4285         /* Make sure we've acked all the TCP data associated
4286          * with the data requests being unplugged */
4287         drbd_tcp_quickack(tconn->data.socket);
4288
4289         return 0;
4290 }
4291
4292 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4293 {
4294         struct drbd_conf *mdev;
4295         struct p_block_desc *p = pi->data;
4296
4297         mdev = vnr_to_mdev(tconn, pi->vnr);
4298         if (!mdev)
4299                 return -EIO;
4300
4301         switch (mdev->state.conn) {
4302         case C_WF_SYNC_UUID:
4303         case C_WF_BITMAP_T:
4304         case C_BEHIND:
4305                         break;
4306         default:
4307                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4308                                 drbd_conn_str(mdev->state.conn));
4309         }
4310
4311         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4312
4313         return 0;
4314 }
4315
4316 struct data_cmd {
4317         int expect_payload;
4318         size_t pkt_size;
4319         int (*fn)(struct drbd_tconn *, struct packet_info *);
4320 };
4321
4322 static struct data_cmd drbd_cmd_handler[] = {
4323         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4324         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4325         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4326         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4327         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4328         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4329         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4330         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4331         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4332         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4333         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4334         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4335         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4336         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4337         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4338         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4339         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4340         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4341         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4342         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4343         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4344         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4345         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4346         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4347 };
4348
4349 static void drbdd(struct drbd_tconn *tconn)
4350 {
4351         struct packet_info pi;
4352         size_t shs; /* sub header size */
4353         int err;
4354
4355         while (get_t_state(&tconn->receiver) == RUNNING) {
4356                 struct data_cmd *cmd;
4357
4358                 drbd_thread_current_set_cpu(&tconn->receiver);
4359                 if (drbd_recv_header(tconn, &pi))
4360                         goto err_out;
4361
4362                 cmd = &drbd_cmd_handler[pi.cmd];
4363                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4364                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4365                                  cmdname(pi.cmd), pi.cmd);
4366                         goto err_out;
4367                 }
4368
4369                 shs = cmd->pkt_size;
4370                 if (pi.size > shs && !cmd->expect_payload) {
4371                         conn_err(tconn, "No payload expected %s l:%d\n",
4372                                  cmdname(pi.cmd), pi.size);
4373                         goto err_out;
4374                 }
4375
4376                 if (shs) {
4377                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4378                         if (err)
4379                                 goto err_out;
4380                         pi.size -= shs;
4381                 }
4382
4383                 err = cmd->fn(tconn, &pi);
4384                 if (err) {
4385                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4386                                  cmdname(pi.cmd), err, pi.size);
4387                         goto err_out;
4388                 }
4389         }
4390         return;
4391
4392     err_out:
4393         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4394 }
4395
4396 void conn_flush_workqueue(struct drbd_tconn *tconn)
4397 {
4398         struct drbd_wq_barrier barr;
4399
4400         barr.w.cb = w_prev_work_done;
4401         barr.w.tconn = tconn;
4402         init_completion(&barr.done);
4403         drbd_queue_work(&tconn->data.work, &barr.w);
4404         wait_for_completion(&barr.done);
4405 }
4406
4407 static void conn_disconnect(struct drbd_tconn *tconn)
4408 {
4409         struct drbd_conf *mdev;
4410         enum drbd_conns oc;
4411         int vnr;
4412
4413         if (tconn->cstate == C_STANDALONE)
4414                 return;
4415
4416         /* We are about to start the cleanup after connection loss.
4417          * Make sure drbd_make_request knows about that.
4418          * Usually we should be in some network failure state already,
4419          * but just in case we are not, we fix it up here.
4420          */
4421         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4422
4423         /* asender does not clean up anything. it must not interfere, either */
4424         drbd_thread_stop(&tconn->asender);
4425         drbd_free_sock(tconn);
4426
4427         rcu_read_lock();
4428         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4429                 kref_get(&mdev->kref);
4430                 rcu_read_unlock();
4431                 drbd_disconnected(mdev);
4432                 kref_put(&mdev->kref, &drbd_minor_destroy);
4433                 rcu_read_lock();
4434         }
4435         rcu_read_unlock();
4436
4437         if (!list_empty(&tconn->current_epoch->list))
4438                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4439         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4440         atomic_set(&tconn->current_epoch->epoch_size, 0);
4441
4442         conn_info(tconn, "Connection closed\n");
4443
4444         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4445                 conn_try_outdate_peer_async(tconn);
4446
4447         spin_lock_irq(&tconn->req_lock);
4448         oc = tconn->cstate;
4449         if (oc >= C_UNCONNECTED)
4450                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4451
4452         spin_unlock_irq(&tconn->req_lock);
4453
4454         if (oc == C_DISCONNECTING)
4455                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4456 }
4457
4458 static int drbd_disconnected(struct drbd_conf *mdev)
4459 {
4460         unsigned int i;
4461
4462         /* wait for current activity to cease. */
4463         spin_lock_irq(&mdev->tconn->req_lock);
4464         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4465         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4466         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4467         spin_unlock_irq(&mdev->tconn->req_lock);
4468
4469         /* We do not have data structures that would allow us to
4470          * get the rs_pending_cnt down to 0 again.
4471          *  * On C_SYNC_TARGET we do not have any data structures describing
4472          *    the pending RSDataRequest's we have sent.
4473          *  * On C_SYNC_SOURCE there is no data structure that tracks
4474          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4475          *  And no, it is not the sum of the reference counts in the
4476          *  resync_LRU. The resync_LRU tracks the whole operation including
4477          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4478          *  on the fly. */
4479         drbd_rs_cancel_all(mdev);
4480         mdev->rs_total = 0;
4481         mdev->rs_failed = 0;
4482         atomic_set(&mdev->rs_pending_cnt, 0);
4483         wake_up(&mdev->misc_wait);
4484
4485         del_timer_sync(&mdev->resync_timer);
4486         resync_timer_fn((unsigned long)mdev);
4487
4488         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4489          * w_make_resync_request etc. which may still be on the worker queue
4490          * to be "canceled" */
4491         drbd_flush_workqueue(mdev);
4492
4493         drbd_finish_peer_reqs(mdev);
4494
4495         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4496            might have issued a work again. The one before drbd_finish_peer_reqs() is
4497            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4498         drbd_flush_workqueue(mdev);
4499
4500         kfree(mdev->p_uuid);
4501         mdev->p_uuid = NULL;
4502
4503         if (!drbd_suspended(mdev))
4504                 tl_clear(mdev->tconn);
4505
4506         drbd_md_sync(mdev);
4507
4508         /* serialize with bitmap writeout triggered by the state change,
4509          * if any. */
4510         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4511
4512         /* tcp_close and release of sendpage pages can be deferred.  I don't
4513          * want to use SO_LINGER, because apparently it can be deferred for
4514          * more than 20 seconds (longest time I checked).
4515          *
4516          * Actually we don't care for exactly when the network stack does its
4517          * put_page(), but release our reference on these pages right here.
4518          */
4519         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4520         if (i)
4521                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4522         i = atomic_read(&mdev->pp_in_use_by_net);
4523         if (i)
4524                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4525         i = atomic_read(&mdev->pp_in_use);
4526         if (i)
4527                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4528
4529         D_ASSERT(list_empty(&mdev->read_ee));
4530         D_ASSERT(list_empty(&mdev->active_ee));
4531         D_ASSERT(list_empty(&mdev->sync_ee));
4532         D_ASSERT(list_empty(&mdev->done_ee));
4533
4534         return 0;
4535 }
4536
4537 /*
4538  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4539  * we can agree on is stored in agreed_pro_version.
4540  *
4541  * feature flags and the reserved array should be enough room for future
4542  * enhancements of the handshake protocol, and possible plugins...
4543  *
4544  * for now, they are expected to be zero, but ignored.
4545  */
4546 static int drbd_send_features(struct drbd_tconn *tconn)
4547 {
4548         struct drbd_socket *sock;
4549         struct p_connection_features *p;
4550
4551         sock = &tconn->data;
4552         p = conn_prepare_command(tconn, sock);
4553         if (!p)
4554                 return -EIO;
4555         memset(p, 0, sizeof(*p));
4556         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4557         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4558         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4559 }
4560
4561 /*
4562  * return values:
4563  *   1 yes, we have a valid connection
4564  *   0 oops, did not work out, please try again
4565  *  -1 peer talks different language,
4566  *     no point in trying again, please go standalone.
4567  */
4568 static int drbd_do_features(struct drbd_tconn *tconn)
4569 {
4570         /* ASSERT current == tconn->receiver ... */
4571         struct p_connection_features *p;
4572         const int expect = sizeof(struct p_connection_features);
4573         struct packet_info pi;
4574         int err;
4575
4576         err = drbd_send_features(tconn);
4577         if (err)
4578                 return 0;
4579
4580         err = drbd_recv_header(tconn, &pi);
4581         if (err)
4582                 return 0;
4583
4584         if (pi.cmd != P_CONNECTION_FEATURES) {
4585                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4586                          cmdname(pi.cmd), pi.cmd);
4587                 return -1;
4588         }
4589
4590         if (pi.size != expect) {
4591                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4592                      expect, pi.size);
4593                 return -1;
4594         }
4595
4596         p = pi.data;
4597         err = drbd_recv_all_warn(tconn, p, expect);
4598         if (err)
4599                 return 0;
4600
4601         p->protocol_min = be32_to_cpu(p->protocol_min);
4602         p->protocol_max = be32_to_cpu(p->protocol_max);
4603         if (p->protocol_max == 0)
4604                 p->protocol_max = p->protocol_min;
4605
4606         if (PRO_VERSION_MAX < p->protocol_min ||
4607             PRO_VERSION_MIN > p->protocol_max)
4608                 goto incompat;
4609
4610         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4611
4612         conn_info(tconn, "Handshake successful: "
4613              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4614
4615         return 1;
4616
4617  incompat:
4618         conn_err(tconn, "incompatible DRBD dialects: "
4619             "I support %d-%d, peer supports %d-%d\n",
4620             PRO_VERSION_MIN, PRO_VERSION_MAX,
4621             p->protocol_min, p->protocol_max);
4622         return -1;
4623 }
4624
4625 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4626 static int drbd_do_auth(struct drbd_tconn *tconn)
4627 {
4628         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4629         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4630         return -1;
4631 }
4632 #else
4633 #define CHALLENGE_LEN 64
4634
4635 /* Return value:
4636         1 - auth succeeded,
4637         0 - failed, try again (network error),
4638         -1 - auth failed, don't try again.
4639 */
4640
4641 static int drbd_do_auth(struct drbd_tconn *tconn)
4642 {
4643         struct drbd_socket *sock;
4644         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4645         struct scatterlist sg;
4646         char *response = NULL;
4647         char *right_response = NULL;
4648         char *peers_ch = NULL;
4649         unsigned int key_len;
4650         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4651         unsigned int resp_size;
4652         struct hash_desc desc;
4653         struct packet_info pi;
4654         struct net_conf *nc;
4655         int err, rv;
4656
4657         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4658
4659         rcu_read_lock();
4660         nc = rcu_dereference(tconn->net_conf);
4661         key_len = strlen(nc->shared_secret);
4662         memcpy(secret, nc->shared_secret, key_len);
4663         rcu_read_unlock();
4664
4665         desc.tfm = tconn->cram_hmac_tfm;
4666         desc.flags = 0;
4667
4668         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4669         if (rv) {
4670                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4671                 rv = -1;
4672                 goto fail;
4673         }
4674
4675         get_random_bytes(my_challenge, CHALLENGE_LEN);
4676
4677         sock = &tconn->data;
4678         if (!conn_prepare_command(tconn, sock)) {
4679                 rv = 0;
4680                 goto fail;
4681         }
4682         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4683                                 my_challenge, CHALLENGE_LEN);
4684         if (!rv)
4685                 goto fail;
4686
4687         err = drbd_recv_header(tconn, &pi);
4688         if (err) {
4689                 rv = 0;
4690                 goto fail;
4691         }
4692
4693         if (pi.cmd != P_AUTH_CHALLENGE) {
4694                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4695                          cmdname(pi.cmd), pi.cmd);
4696                 rv = 0;
4697                 goto fail;
4698         }
4699
4700         if (pi.size > CHALLENGE_LEN * 2) {
4701                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4702                 rv = -1;
4703                 goto fail;
4704         }
4705
4706         peers_ch = kmalloc(pi.size, GFP_NOIO);
4707         if (peers_ch == NULL) {
4708                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4709                 rv = -1;
4710                 goto fail;
4711         }
4712
4713         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4714         if (err) {
4715                 rv = 0;
4716                 goto fail;
4717         }
4718
4719         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4720         response = kmalloc(resp_size, GFP_NOIO);
4721         if (response == NULL) {
4722                 conn_err(tconn, "kmalloc of response failed\n");
4723                 rv = -1;
4724                 goto fail;
4725         }
4726
4727         sg_init_table(&sg, 1);
4728         sg_set_buf(&sg, peers_ch, pi.size);
4729
4730         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4731         if (rv) {
4732                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4733                 rv = -1;
4734                 goto fail;
4735         }
4736
4737         if (!conn_prepare_command(tconn, sock)) {
4738                 rv = 0;
4739                 goto fail;
4740         }
4741         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4742                                 response, resp_size);
4743         if (!rv)
4744                 goto fail;
4745
4746         err = drbd_recv_header(tconn, &pi);
4747         if (err) {
4748                 rv = 0;
4749                 goto fail;
4750         }
4751
4752         if (pi.cmd != P_AUTH_RESPONSE) {
4753                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4754                          cmdname(pi.cmd), pi.cmd);
4755                 rv = 0;
4756                 goto fail;
4757         }
4758
4759         if (pi.size != resp_size) {
4760                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4761                 rv = 0;
4762                 goto fail;
4763         }
4764
4765         err = drbd_recv_all_warn(tconn, response , resp_size);
4766         if (err) {
4767                 rv = 0;
4768                 goto fail;
4769         }
4770
4771         right_response = kmalloc(resp_size, GFP_NOIO);
4772         if (right_response == NULL) {
4773                 conn_err(tconn, "kmalloc of right_response failed\n");
4774                 rv = -1;
4775                 goto fail;
4776         }
4777
4778         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4779
4780         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4781         if (rv) {
4782                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4783                 rv = -1;
4784                 goto fail;
4785         }
4786
4787         rv = !memcmp(response, right_response, resp_size);
4788
4789         if (rv)
4790                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4791                      resp_size);
4792         else
4793                 rv = -1;
4794
4795  fail:
4796         kfree(peers_ch);
4797         kfree(response);
4798         kfree(right_response);
4799
4800         return rv;
4801 }
4802 #endif
4803
4804 int drbdd_init(struct drbd_thread *thi)
4805 {
4806         struct drbd_tconn *tconn = thi->tconn;
4807         int h;
4808
4809         conn_info(tconn, "receiver (re)started\n");
4810
4811         do {
4812                 h = conn_connect(tconn);
4813                 if (h == 0) {
4814                         conn_disconnect(tconn);
4815                         schedule_timeout_interruptible(HZ);
4816                 }
4817                 if (h == -1) {
4818                         conn_warn(tconn, "Discarding network configuration.\n");
4819                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4820                 }
4821         } while (h == 0);
4822
4823         if (h > 0)
4824                 drbdd(tconn);
4825
4826         conn_disconnect(tconn);
4827
4828         conn_info(tconn, "receiver terminated\n");
4829         return 0;
4830 }
4831
4832 /* ********* acknowledge sender ******** */
4833
4834 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4835 {
4836         struct p_req_state_reply *p = pi->data;
4837         int retcode = be32_to_cpu(p->retcode);
4838
4839         if (retcode >= SS_SUCCESS) {
4840                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4841         } else {
4842                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4843                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4844                          drbd_set_st_err_str(retcode), retcode);
4845         }
4846         wake_up(&tconn->ping_wait);
4847
4848         return 0;
4849 }
4850
4851 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4852 {
4853         struct drbd_conf *mdev;
4854         struct p_req_state_reply *p = pi->data;
4855         int retcode = be32_to_cpu(p->retcode);
4856
4857         mdev = vnr_to_mdev(tconn, pi->vnr);
4858         if (!mdev)
4859                 return -EIO;
4860
4861         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4862                 D_ASSERT(tconn->agreed_pro_version < 100);
4863                 return got_conn_RqSReply(tconn, pi);
4864         }
4865
4866         if (retcode >= SS_SUCCESS) {
4867                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4868         } else {
4869                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4870                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4871                         drbd_set_st_err_str(retcode), retcode);
4872         }
4873         wake_up(&mdev->state_wait);
4874
4875         return 0;
4876 }
4877
4878 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4879 {
4880         return drbd_send_ping_ack(tconn);
4881
4882 }
4883
4884 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4885 {
4886         /* restore idle timeout */
4887         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4888         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4889                 wake_up(&tconn->ping_wait);
4890
4891         return 0;
4892 }
4893
4894 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4895 {
4896         struct drbd_conf *mdev;
4897         struct p_block_ack *p = pi->data;
4898         sector_t sector = be64_to_cpu(p->sector);
4899         int blksize = be32_to_cpu(p->blksize);
4900
4901         mdev = vnr_to_mdev(tconn, pi->vnr);
4902         if (!mdev)
4903                 return -EIO;
4904
4905         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4906
4907         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4908
4909         if (get_ldev(mdev)) {
4910                 drbd_rs_complete_io(mdev, sector);
4911                 drbd_set_in_sync(mdev, sector, blksize);
4912                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4913                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4914                 put_ldev(mdev);
4915         }
4916         dec_rs_pending(mdev);
4917         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4918
4919         return 0;
4920 }
4921
4922 static int
4923 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4924                               struct rb_root *root, const char *func,
4925                               enum drbd_req_event what, bool missing_ok)
4926 {
4927         struct drbd_request *req;
4928         struct bio_and_error m;
4929
4930         spin_lock_irq(&mdev->tconn->req_lock);
4931         req = find_request(mdev, root, id, sector, missing_ok, func);
4932         if (unlikely(!req)) {
4933                 spin_unlock_irq(&mdev->tconn->req_lock);
4934                 return -EIO;
4935         }
4936         __req_mod(req, what, &m);
4937         spin_unlock_irq(&mdev->tconn->req_lock);
4938
4939         if (m.bio)
4940                 complete_master_bio(mdev, &m);
4941         return 0;
4942 }
4943
4944 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4945 {
4946         struct drbd_conf *mdev;
4947         struct p_block_ack *p = pi->data;
4948         sector_t sector = be64_to_cpu(p->sector);
4949         int blksize = be32_to_cpu(p->blksize);
4950         enum drbd_req_event what;
4951
4952         mdev = vnr_to_mdev(tconn, pi->vnr);
4953         if (!mdev)
4954                 return -EIO;
4955
4956         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4957
4958         if (p->block_id == ID_SYNCER) {
4959                 drbd_set_in_sync(mdev, sector, blksize);
4960                 dec_rs_pending(mdev);
4961                 return 0;
4962         }
4963         switch (pi->cmd) {
4964         case P_RS_WRITE_ACK:
4965                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4966                 break;
4967         case P_WRITE_ACK:
4968                 what = WRITE_ACKED_BY_PEER;
4969                 break;
4970         case P_RECV_ACK:
4971                 what = RECV_ACKED_BY_PEER;
4972                 break;
4973         case P_DISCARD_WRITE:
4974                 what = DISCARD_WRITE;
4975                 break;
4976         case P_RETRY_WRITE:
4977                 what = POSTPONE_WRITE;
4978                 break;
4979         default:
4980                 BUG();
4981         }
4982
4983         return validate_req_change_req_state(mdev, p->block_id, sector,
4984                                              &mdev->write_requests, __func__,
4985                                              what, false);
4986 }
4987
4988 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4989 {
4990         struct drbd_conf *mdev;
4991         struct p_block_ack *p = pi->data;
4992         sector_t sector = be64_to_cpu(p->sector);
4993         int size = be32_to_cpu(p->blksize);
4994         int err;
4995
4996         mdev = vnr_to_mdev(tconn, pi->vnr);
4997         if (!mdev)
4998                 return -EIO;
4999
5000         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5001
5002         if (p->block_id == ID_SYNCER) {
5003                 dec_rs_pending(mdev);
5004                 drbd_rs_failed_io(mdev, sector, size);
5005                 return 0;
5006         }
5007
5008         err = validate_req_change_req_state(mdev, p->block_id, sector,
5009                                             &mdev->write_requests, __func__,
5010                                             NEG_ACKED, true);
5011         if (err) {
5012                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5013                    The master bio might already be completed, therefore the
5014                    request is no longer in the collision hash. */
5015                 /* In Protocol B we might already have got a P_RECV_ACK
5016                    but then get a P_NEG_ACK afterwards. */
5017                 drbd_set_out_of_sync(mdev, sector, size);
5018         }
5019         return 0;
5020 }
5021
5022 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5023 {
5024         struct drbd_conf *mdev;
5025         struct p_block_ack *p = pi->data;
5026         sector_t sector = be64_to_cpu(p->sector);
5027
5028         mdev = vnr_to_mdev(tconn, pi->vnr);
5029         if (!mdev)
5030                 return -EIO;
5031
5032         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5033
5034         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5035             (unsigned long long)sector, be32_to_cpu(p->blksize));
5036
5037         return validate_req_change_req_state(mdev, p->block_id, sector,
5038                                              &mdev->read_requests, __func__,
5039                                              NEG_ACKED, false);
5040 }
5041
5042 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5043 {
5044         struct drbd_conf *mdev;
5045         sector_t sector;
5046         int size;
5047         struct p_block_ack *p = pi->data;
5048
5049         mdev = vnr_to_mdev(tconn, pi->vnr);
5050         if (!mdev)
5051                 return -EIO;
5052
5053         sector = be64_to_cpu(p->sector);
5054         size = be32_to_cpu(p->blksize);
5055
5056         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5057
5058         dec_rs_pending(mdev);
5059
5060         if (get_ldev_if_state(mdev, D_FAILED)) {
5061                 drbd_rs_complete_io(mdev, sector);
5062                 switch (pi->cmd) {
5063                 case P_NEG_RS_DREPLY:
5064                         drbd_rs_failed_io(mdev, sector, size);
5065                 case P_RS_CANCEL:
5066                         break;
5067                 default:
5068                         BUG();
5069                 }
5070                 put_ldev(mdev);
5071         }
5072
5073         return 0;
5074 }
5075
5076 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5077 {
5078         struct drbd_conf *mdev;
5079         struct p_barrier_ack *p = pi->data;
5080
5081         mdev = vnr_to_mdev(tconn, pi->vnr);
5082         if (!mdev)
5083                 return -EIO;
5084
5085         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
5086
5087         if (mdev->state.conn == C_AHEAD &&
5088             atomic_read(&mdev->ap_in_flight) == 0 &&
5089             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5090                 mdev->start_resync_timer.expires = jiffies + HZ;
5091                 add_timer(&mdev->start_resync_timer);
5092         }
5093
5094         return 0;
5095 }
5096
5097 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5098 {
5099         struct drbd_conf *mdev;
5100         struct p_block_ack *p = pi->data;
5101         struct drbd_work *w;
5102         sector_t sector;
5103         int size;
5104
5105         mdev = vnr_to_mdev(tconn, pi->vnr);
5106         if (!mdev)
5107                 return -EIO;
5108
5109         sector = be64_to_cpu(p->sector);
5110         size = be32_to_cpu(p->blksize);
5111
5112         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5113
5114         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5115                 drbd_ov_out_of_sync_found(mdev, sector, size);
5116         else
5117                 ov_out_of_sync_print(mdev);
5118
5119         if (!get_ldev(mdev))
5120                 return 0;
5121
5122         drbd_rs_complete_io(mdev, sector);
5123         dec_rs_pending(mdev);
5124
5125         --mdev->ov_left;
5126
5127         /* let's advance progress step marks only for every other megabyte */
5128         if ((mdev->ov_left & 0x200) == 0x200)
5129                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5130
5131         if (mdev->ov_left == 0) {
5132                 w = kmalloc(sizeof(*w), GFP_NOIO);
5133                 if (w) {
5134                         w->cb = w_ov_finished;
5135                         w->mdev = mdev;
5136                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5137                 } else {
5138                         dev_err(DEV, "kmalloc(w) failed.");
5139                         ov_out_of_sync_print(mdev);
5140                         drbd_resync_finished(mdev);
5141                 }
5142         }
5143         put_ldev(mdev);
5144         return 0;
5145 }
5146
5147 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5148 {
5149         return 0;
5150 }
5151
5152 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5153 {
5154         struct drbd_conf *mdev;
5155         int vnr, not_empty = 0;
5156
5157         do {
5158                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5159                 flush_signals(current);
5160
5161                 rcu_read_lock();
5162                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5163                         kref_get(&mdev->kref);
5164                         rcu_read_unlock();
5165                         if (drbd_finish_peer_reqs(mdev)) {
5166                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5167                                 return 1;
5168                         }
5169                         kref_put(&mdev->kref, &drbd_minor_destroy);
5170                         rcu_read_lock();
5171                 }
5172                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5173
5174                 spin_lock_irq(&tconn->req_lock);
5175                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5176                         not_empty = !list_empty(&mdev->done_ee);
5177                         if (not_empty)
5178                                 break;
5179                 }
5180                 spin_unlock_irq(&tconn->req_lock);
5181                 rcu_read_unlock();
5182         } while (not_empty);
5183
5184         return 0;
5185 }
5186
5187 struct asender_cmd {
5188         size_t pkt_size;
5189         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5190 };
5191
5192 static struct asender_cmd asender_tbl[] = {
5193         [P_PING]            = { 0, got_Ping },
5194         [P_PING_ACK]        = { 0, got_PingAck },
5195         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5196         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5197         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5198         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5199         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5200         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5201         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5202         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5203         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5204         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5205         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5206         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5207         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5208         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5209         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5210 };
5211
5212 int drbd_asender(struct drbd_thread *thi)
5213 {
5214         struct drbd_tconn *tconn = thi->tconn;
5215         struct asender_cmd *cmd = NULL;
5216         struct packet_info pi;
5217         int rv;
5218         void *buf    = tconn->meta.rbuf;
5219         int received = 0;
5220         unsigned int header_size = drbd_header_size(tconn);
5221         int expect   = header_size;
5222         bool ping_timeout_active = false;
5223         struct net_conf *nc;
5224         int ping_timeo, tcp_cork, ping_int;
5225
5226         current->policy = SCHED_RR;  /* Make this a realtime task! */
5227         current->rt_priority = 2;    /* more important than all other tasks */
5228
5229         while (get_t_state(thi) == RUNNING) {
5230                 drbd_thread_current_set_cpu(thi);
5231
5232                 rcu_read_lock();
5233                 nc = rcu_dereference(tconn->net_conf);
5234                 ping_timeo = nc->ping_timeo;
5235                 tcp_cork = nc->tcp_cork;
5236                 ping_int = nc->ping_int;
5237                 rcu_read_unlock();
5238
5239                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5240                         if (drbd_send_ping(tconn)) {
5241                                 conn_err(tconn, "drbd_send_ping has failed\n");
5242                                 goto reconnect;
5243                         }
5244                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5245                         ping_timeout_active = true;
5246                 }
5247
5248                 /* TODO: conditionally cork; it may hurt latency if we cork without
5249                    much to send */
5250                 if (tcp_cork)
5251                         drbd_tcp_cork(tconn->meta.socket);
5252                 if (tconn_finish_peer_reqs(tconn)) {
5253                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5254                         goto reconnect;
5255                 }
5256                 /* but unconditionally uncork unless disabled */
5257                 if (tcp_cork)
5258                         drbd_tcp_uncork(tconn->meta.socket);
5259
5260                 /* short circuit, recv_msg would return EINTR anyways. */
5261                 if (signal_pending(current))
5262                         continue;
5263
5264                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5265                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5266
5267                 flush_signals(current);
5268
5269                 /* Note:
5270                  * -EINTR        (on meta) we got a signal
5271                  * -EAGAIN       (on meta) rcvtimeo expired
5272                  * -ECONNRESET   other side closed the connection
5273                  * -ERESTARTSYS  (on data) we got a signal
5274                  * rv <  0       other than above: unexpected error!
5275                  * rv == expected: full header or command
5276                  * rv <  expected: "woken" by signal during receive
5277                  * rv == 0       : "connection shut down by peer"
5278                  */
5279                 if (likely(rv > 0)) {
5280                         received += rv;
5281                         buf      += rv;
5282                 } else if (rv == 0) {
5283                         conn_err(tconn, "meta connection shut down by peer.\n");
5284                         goto reconnect;
5285                 } else if (rv == -EAGAIN) {
5286                         /* If the data socket received something meanwhile,
5287                          * that is good enough: peer is still alive. */
5288                         if (time_after(tconn->last_received,
5289                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5290                                 continue;
5291                         if (ping_timeout_active) {
5292                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5293                                 goto reconnect;
5294                         }
5295                         set_bit(SEND_PING, &tconn->flags);
5296                         continue;
5297                 } else if (rv == -EINTR) {
5298                         continue;
5299                 } else {
5300                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5301                         goto reconnect;
5302                 }
5303
5304                 if (received == expect && cmd == NULL) {
5305                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5306                                 goto reconnect;
5307                         cmd = &asender_tbl[pi.cmd];
5308                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5309                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5310                                          cmdname(pi.cmd), pi.cmd);
5311                                 goto disconnect;
5312                         }
5313                         expect = header_size + cmd->pkt_size;
5314                         if (pi.size != expect - header_size) {
5315                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5316                                         pi.cmd, pi.size);
5317                                 goto reconnect;
5318                         }
5319                 }
5320                 if (received == expect) {
5321                         bool err;
5322
5323                         err = cmd->fn(tconn, &pi);
5324                         if (err) {
5325                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5326                                 goto reconnect;
5327                         }
5328
5329                         tconn->last_received = jiffies;
5330
5331                         if (cmd == &asender_tbl[P_PING_ACK]) {
5332                                 /* restore idle timeout */
5333                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5334                                 ping_timeout_active = false;
5335                         }
5336
5337                         buf      = tconn->meta.rbuf;
5338                         received = 0;
5339                         expect   = header_size;
5340                         cmd      = NULL;
5341                 }
5342         }
5343
5344         if (0) {
5345 reconnect:
5346                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5347         }
5348         if (0) {
5349 disconnect:
5350                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5351         }
5352         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5353
5354         conn_info(tconn, "asender terminated\n");
5355
5356         return 0;
5357 }