drbd: Try to connec to peer only once per cycle
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_peer_request *peer_req;
337         struct page *page = NULL;
338         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
339
340         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
341                 return NULL;
342
343         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
344         if (!peer_req) {
345                 if (!(gfp_mask & __GFP_NOWARN))
346                         dev_err(DEV, "%s: allocation failed\n", __func__);
347                 return NULL;
348         }
349
350         if (data_size) {
351                 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
352                 if (!page)
353                         goto fail;
354         }
355
356         drbd_clear_interval(&peer_req->i);
357         peer_req->i.size = data_size;
358         peer_req->i.sector = sector;
359         peer_req->i.local = false;
360         peer_req->i.waiting = false;
361
362         peer_req->epoch = NULL;
363         peer_req->w.mdev = mdev;
364         peer_req->pages = page;
365         atomic_set(&peer_req->pending_bios, 0);
366         peer_req->flags = 0;
367         /*
368          * The block_id is opaque to the receiver.  It is not endianness
369          * converted, and sent back to the sender unchanged.
370          */
371         peer_req->block_id = id;
372
373         return peer_req;
374
375  fail:
376         mempool_free(peer_req, drbd_ee_mempool);
377         return NULL;
378 }
379
380 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
381                        int is_net)
382 {
383         if (peer_req->flags & EE_HAS_DIGEST)
384                 kfree(peer_req->digest);
385         drbd_free_pages(mdev, peer_req->pages, is_net);
386         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
387         D_ASSERT(drbd_interval_empty(&peer_req->i));
388         mempool_free(peer_req, drbd_ee_mempool);
389 }
390
391 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
392 {
393         LIST_HEAD(work_list);
394         struct drbd_peer_request *peer_req, *t;
395         int count = 0;
396         int is_net = list == &mdev->net_ee;
397
398         spin_lock_irq(&mdev->tconn->req_lock);
399         list_splice_init(list, &work_list);
400         spin_unlock_irq(&mdev->tconn->req_lock);
401
402         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
403                 __drbd_free_peer_req(mdev, peer_req, is_net);
404                 count++;
405         }
406         return count;
407 }
408
409 /*
410  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
411  */
412 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
413 {
414         LIST_HEAD(work_list);
415         LIST_HEAD(reclaimed);
416         struct drbd_peer_request *peer_req, *t;
417         int err = 0;
418
419         spin_lock_irq(&mdev->tconn->req_lock);
420         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
421         list_splice_init(&mdev->done_ee, &work_list);
422         spin_unlock_irq(&mdev->tconn->req_lock);
423
424         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
425                 drbd_free_net_peer_req(mdev, peer_req);
426
427         /* possible callbacks here:
428          * e_end_block, and e_end_resync_block, e_send_discard_write.
429          * all ignore the last argument.
430          */
431         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432                 int err2;
433
434                 /* list_del not necessary, next/prev members not touched */
435                 err2 = peer_req->w.cb(&peer_req->w, !!err);
436                 if (!err)
437                         err = err2;
438                 drbd_free_peer_req(mdev, peer_req);
439         }
440         wake_up(&mdev->ee_wait);
441
442         return err;
443 }
444
445 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
446                                      struct list_head *head)
447 {
448         DEFINE_WAIT(wait);
449
450         /* avoids spin_lock/unlock
451          * and calling prepare_to_wait in the fast path */
452         while (!list_empty(head)) {
453                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
454                 spin_unlock_irq(&mdev->tconn->req_lock);
455                 io_schedule();
456                 finish_wait(&mdev->ee_wait, &wait);
457                 spin_lock_irq(&mdev->tconn->req_lock);
458         }
459 }
460
461 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
462                                     struct list_head *head)
463 {
464         spin_lock_irq(&mdev->tconn->req_lock);
465         _drbd_wait_ee_list_empty(mdev, head);
466         spin_unlock_irq(&mdev->tconn->req_lock);
467 }
468
469 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
470 {
471         mm_segment_t oldfs;
472         struct kvec iov = {
473                 .iov_base = buf,
474                 .iov_len = size,
475         };
476         struct msghdr msg = {
477                 .msg_iovlen = 1,
478                 .msg_iov = (struct iovec *)&iov,
479                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
480         };
481         int rv;
482
483         oldfs = get_fs();
484         set_fs(KERNEL_DS);
485         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
486         set_fs(oldfs);
487
488         return rv;
489 }
490
491 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
492 {
493         mm_segment_t oldfs;
494         struct kvec iov = {
495                 .iov_base = buf,
496                 .iov_len = size,
497         };
498         struct msghdr msg = {
499                 .msg_iovlen = 1,
500                 .msg_iov = (struct iovec *)&iov,
501                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
502         };
503         int rv;
504
505         oldfs = get_fs();
506         set_fs(KERNEL_DS);
507
508         for (;;) {
509                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
510                 if (rv == size)
511                         break;
512
513                 /* Note:
514                  * ECONNRESET   other side closed the connection
515                  * ERESTARTSYS  (on  sock) we got a signal
516                  */
517
518                 if (rv < 0) {
519                         if (rv == -ECONNRESET)
520                                 conn_info(tconn, "sock was reset by peer\n");
521                         else if (rv != -ERESTARTSYS)
522                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
523                         break;
524                 } else if (rv == 0) {
525                         conn_info(tconn, "sock was shut down by peer\n");
526                         break;
527                 } else  {
528                         /* signal came in, or peer/link went down,
529                          * after we read a partial message
530                          */
531                         /* D_ASSERT(signal_pending(current)); */
532                         break;
533                 }
534         };
535
536         set_fs(oldfs);
537
538         if (rv != size)
539                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
540
541         return rv;
542 }
543
544 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
545 {
546         int err;
547
548         err = drbd_recv(tconn, buf, size);
549         if (err != size) {
550                 if (err >= 0)
551                         err = -EIO;
552         } else
553                 err = 0;
554         return err;
555 }
556
557 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
558 {
559         int err;
560
561         err = drbd_recv_all(tconn, buf, size);
562         if (err && !signal_pending(current))
563                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
564         return err;
565 }
566
567 /* quoting tcp(7):
568  *   On individual connections, the socket buffer size must be set prior to the
569  *   listen(2) or connect(2) calls in order to have it take effect.
570  * This is our wrapper to do so.
571  */
572 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
573                 unsigned int rcv)
574 {
575         /* open coded SO_SNDBUF, SO_RCVBUF */
576         if (snd) {
577                 sock->sk->sk_sndbuf = snd;
578                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
579         }
580         if (rcv) {
581                 sock->sk->sk_rcvbuf = rcv;
582                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
583         }
584 }
585
586 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
587 {
588         const char *what;
589         struct socket *sock;
590         struct sockaddr_in6 src_in6;
591         struct sockaddr_in6 peer_in6;
592         struct net_conf *nc;
593         int err, peer_addr_len, my_addr_len;
594         int sndbuf_size, rcvbuf_size, connect_int;
595         int disconnect_on_error = 1;
596
597         rcu_read_lock();
598         nc = rcu_dereference(tconn->net_conf);
599         if (!nc) {
600                 rcu_read_unlock();
601                 return NULL;
602         }
603         sndbuf_size = nc->sndbuf_size;
604         rcvbuf_size = nc->rcvbuf_size;
605         connect_int = nc->connect_int;
606         rcu_read_unlock();
607
608         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
609         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
610
611         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
612                 src_in6.sin6_port = 0;
613         else
614                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
615
616         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
617         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
618
619         what = "sock_create_kern";
620         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
621                                SOCK_STREAM, IPPROTO_TCP, &sock);
622         if (err < 0) {
623                 sock = NULL;
624                 goto out;
625         }
626
627         sock->sk->sk_rcvtimeo =
628         sock->sk->sk_sndtimeo = connect_int * HZ;
629         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
630
631        /* explicitly bind to the configured IP as source IP
632         *  for the outgoing connections.
633         *  This is needed for multihomed hosts and to be
634         *  able to use lo: interfaces for drbd.
635         * Make sure to use 0 as port number, so linux selects
636         *  a free one dynamically.
637         */
638         what = "bind before connect";
639         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
640         if (err < 0)
641                 goto out;
642
643         /* connect may fail, peer not yet available.
644          * stay C_WF_CONNECTION, don't go Disconnecting! */
645         disconnect_on_error = 0;
646         what = "connect";
647         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
648
649 out:
650         if (err < 0) {
651                 if (sock) {
652                         sock_release(sock);
653                         sock = NULL;
654                 }
655                 switch (-err) {
656                         /* timeout, busy, signal pending */
657                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
658                 case EINTR: case ERESTARTSYS:
659                         /* peer not (yet) available, network problem */
660                 case ECONNREFUSED: case ENETUNREACH:
661                 case EHOSTDOWN:    case EHOSTUNREACH:
662                         disconnect_on_error = 0;
663                         break;
664                 default:
665                         conn_err(tconn, "%s failed, err = %d\n", what, err);
666                 }
667                 if (disconnect_on_error)
668                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
669         }
670
671         return sock;
672 }
673
674 struct accept_wait_data {
675         struct drbd_tconn *tconn;
676         struct socket *s_listen;
677         struct completion door_bell;
678         void (*original_sk_state_change)(struct sock *sk);
679
680 };
681
682 static void incomming_connection(struct sock *sk)
683 {
684         struct accept_wait_data *ad = sk->sk_user_data;
685         struct drbd_tconn *tconn = ad->tconn;
686
687         if (sk->sk_state != TCP_ESTABLISHED)
688                 conn_warn(tconn, "unexpected tcp state change. sk_state = %d\n", sk->sk_state);
689
690         write_lock_bh(&sk->sk_callback_lock);
691         sk->sk_state_change = ad->original_sk_state_change;
692         sk->sk_user_data = NULL;
693         write_unlock_bh(&sk->sk_callback_lock);
694
695         sk->sk_state_change(sk);
696         complete(&ad->door_bell);
697 }
698
699 static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
700 {
701         int err, sndbuf_size, rcvbuf_size, my_addr_len;
702         struct sockaddr_in6 my_addr;
703         struct socket *s_listen;
704         struct net_conf *nc;
705         const char *what;
706
707         rcu_read_lock();
708         nc = rcu_dereference(tconn->net_conf);
709         if (!nc) {
710                 rcu_read_unlock();
711                 return -EIO;
712         }
713         sndbuf_size = nc->sndbuf_size;
714         rcvbuf_size = nc->rcvbuf_size;
715         rcu_read_unlock();
716
717         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
718         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
719
720         what = "sock_create_kern";
721         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
722                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
723         if (err) {
724                 s_listen = NULL;
725                 goto out;
726         }
727
728         s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
729         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
730
731         what = "bind before listen";
732         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
733         if (err < 0)
734                 goto out;
735
736         ad->s_listen = s_listen;
737         write_lock_bh(&s_listen->sk->sk_callback_lock);
738         ad->original_sk_state_change = s_listen->sk->sk_state_change;
739         s_listen->sk->sk_state_change = incomming_connection;
740         s_listen->sk->sk_user_data = ad;
741         write_unlock_bh(&s_listen->sk->sk_callback_lock);
742
743         what = "listen";
744         err = s_listen->ops->listen(s_listen, 5);
745         if (err < 0)
746                 goto out;
747
748         return 0;
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         conn_err(tconn, "%s failed, err = %d\n", what, err);
755                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return -EIO;
760 }
761
762 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
763 {
764         int timeo, connect_int, err = 0;
765         struct socket *s_estab = NULL;
766         struct net_conf *nc;
767
768         rcu_read_lock();
769         nc = rcu_dereference(tconn->net_conf);
770         if (!nc) {
771                 rcu_read_unlock();
772                 return NULL;
773         }
774         connect_int = nc->connect_int;
775         rcu_read_unlock();
776
777         timeo = connect_int * HZ;
778         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
779
780         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
781         if (err <= 0)
782                 return NULL;
783
784         err = kernel_accept(ad->s_listen, &s_estab, 0);
785         if (err < 0) {
786                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
787                         conn_err(tconn, "accept failed, err = %d\n", err);
788                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
789                 }
790         }
791
792         return s_estab;
793 }
794
795 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
796
797 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
798                              enum drbd_packet cmd)
799 {
800         if (!conn_prepare_command(tconn, sock))
801                 return -EIO;
802         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
803 }
804
805 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
806 {
807         unsigned int header_size = drbd_header_size(tconn);
808         struct packet_info pi;
809         int err;
810
811         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
812         if (err != header_size) {
813                 if (err >= 0)
814                         err = -EIO;
815                 return err;
816         }
817         err = decode_header(tconn, tconn->data.rbuf, &pi);
818         if (err)
819                 return err;
820         return pi.cmd;
821 }
822
823 /**
824  * drbd_socket_okay() - Free the socket if its connection is not okay
825  * @sock:       pointer to the pointer to the socket.
826  */
827 static int drbd_socket_okay(struct socket **sock)
828 {
829         int rr;
830         char tb[4];
831
832         if (!*sock)
833                 return false;
834
835         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
836
837         if (rr > 0 || rr == -EAGAIN) {
838                 return true;
839         } else {
840                 sock_release(*sock);
841                 *sock = NULL;
842                 return false;
843         }
844 }
845 /* Gets called if a connection is established, or if a new minor gets created
846    in a connection */
847 int drbd_connected(struct drbd_conf *mdev)
848 {
849         int err;
850
851         atomic_set(&mdev->packet_seq, 0);
852         mdev->peer_seq = 0;
853
854         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
855                 &mdev->tconn->cstate_mutex :
856                 &mdev->own_state_mutex;
857
858         err = drbd_send_sync_param(mdev);
859         if (!err)
860                 err = drbd_send_sizes(mdev, 0, 0);
861         if (!err)
862                 err = drbd_send_uuids(mdev);
863         if (!err)
864                 err = drbd_send_current_state(mdev);
865         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
866         clear_bit(RESIZE_PENDING, &mdev->flags);
867         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
868         return err;
869 }
870
871 /*
872  * return values:
873  *   1 yes, we have a valid connection
874  *   0 oops, did not work out, please try again
875  *  -1 peer talks different language,
876  *     no point in trying again, please go standalone.
877  *  -2 We do not have a network config...
878  */
879 static int conn_connect(struct drbd_tconn *tconn)
880 {
881         struct drbd_socket sock, msock;
882         struct drbd_conf *mdev;
883         struct net_conf *nc;
884         int vnr, timeout, h, ok;
885         bool discard_my_data;
886         enum drbd_state_rv rv;
887         struct accept_wait_data ad = {
888                 .tconn = tconn,
889                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
890         };
891
892         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
893                 return -2;
894
895         mutex_init(&sock.mutex);
896         sock.sbuf = tconn->data.sbuf;
897         sock.rbuf = tconn->data.rbuf;
898         sock.socket = NULL;
899         mutex_init(&msock.mutex);
900         msock.sbuf = tconn->meta.sbuf;
901         msock.rbuf = tconn->meta.rbuf;
902         msock.socket = NULL;
903
904         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
905
906         /* Assume that the peer only understands protocol 80 until we know better.  */
907         tconn->agreed_pro_version = 80;
908
909         if (prepare_listen_socket(tconn, &ad))
910                 return 0;
911
912         do {
913                 struct socket *s;
914
915                 s = drbd_try_connect(tconn);
916                 if (s) {
917                         if (!sock.socket) {
918                                 sock.socket = s;
919                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
920                         } else if (!msock.socket) {
921                                 msock.socket = s;
922                                 send_first_packet(tconn, &msock, P_INITIAL_META);
923                         } else {
924                                 conn_err(tconn, "Logic error in conn_connect()\n");
925                                 goto out_release_sockets;
926                         }
927                 }
928
929                 if (sock.socket && msock.socket) {
930                         rcu_read_lock();
931                         nc = rcu_dereference(tconn->net_conf);
932                         timeout = nc->ping_timeo * HZ / 10;
933                         rcu_read_unlock();
934                         schedule_timeout_interruptible(timeout);
935                         ok = drbd_socket_okay(&sock.socket);
936                         ok = drbd_socket_okay(&msock.socket) && ok;
937                         if (ok)
938                                 break;
939                 }
940
941 retry:
942                 s = drbd_wait_for_connect(tconn, &ad);
943                 if (s) {
944                         int fp = receive_first_packet(tconn, s);
945                         drbd_socket_okay(&sock.socket);
946                         drbd_socket_okay(&msock.socket);
947                         switch (fp) {
948                         case P_INITIAL_DATA:
949                                 if (sock.socket) {
950                                         conn_warn(tconn, "initial packet S crossed\n");
951                                         sock_release(sock.socket);
952                                 }
953                                 sock.socket = s;
954                                 break;
955                         case P_INITIAL_META:
956                                 if (msock.socket) {
957                                         conn_warn(tconn, "initial packet M crossed\n");
958                                         sock_release(msock.socket);
959                                 }
960                                 msock.socket = s;
961                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
962                                 break;
963                         default:
964                                 conn_warn(tconn, "Error receiving initial packet\n");
965                                 sock_release(s);
966                                 if (random32() & 1)
967                                         goto retry;
968                         }
969                 }
970
971                 if (tconn->cstate <= C_DISCONNECTING)
972                         goto out_release_sockets;
973                 if (signal_pending(current)) {
974                         flush_signals(current);
975                         smp_rmb();
976                         if (get_t_state(&tconn->receiver) == EXITING)
977                                 goto out_release_sockets;
978                 }
979
980                 ok = drbd_socket_okay(&sock.socket);
981                 ok = drbd_socket_okay(&msock.socket) && ok;
982         } while (!ok);
983
984         if (ad.s_listen)
985                 sock_release(ad.s_listen);
986
987         sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
988         msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
989
990         sock.socket->sk->sk_allocation = GFP_NOIO;
991         msock.socket->sk->sk_allocation = GFP_NOIO;
992
993         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
994         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
995
996         /* NOT YET ...
997          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
998          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
999          * first set it to the P_CONNECTION_FEATURES timeout,
1000          * which we set to 4x the configured ping_timeout. */
1001         rcu_read_lock();
1002         nc = rcu_dereference(tconn->net_conf);
1003
1004         sock.socket->sk->sk_sndtimeo =
1005         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1006
1007         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1008         timeout = nc->timeout * HZ / 10;
1009         discard_my_data = nc->discard_my_data;
1010         rcu_read_unlock();
1011
1012         msock.socket->sk->sk_sndtimeo = timeout;
1013
1014         /* we don't want delays.
1015          * we use TCP_CORK where appropriate, though */
1016         drbd_tcp_nodelay(sock.socket);
1017         drbd_tcp_nodelay(msock.socket);
1018
1019         tconn->data.socket = sock.socket;
1020         tconn->meta.socket = msock.socket;
1021         tconn->last_received = jiffies;
1022
1023         h = drbd_do_features(tconn);
1024         if (h <= 0)
1025                 return h;
1026
1027         if (tconn->cram_hmac_tfm) {
1028                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
1029                 switch (drbd_do_auth(tconn)) {
1030                 case -1:
1031                         conn_err(tconn, "Authentication of peer failed\n");
1032                         return -1;
1033                 case 0:
1034                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1035                         return 0;
1036                 }
1037         }
1038
1039         tconn->data.socket->sk->sk_sndtimeo = timeout;
1040         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1041
1042         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1043                 return -1;
1044
1045         set_bit(STATE_SENT, &tconn->flags);
1046
1047         rcu_read_lock();
1048         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1049                 kref_get(&mdev->kref);
1050                 rcu_read_unlock();
1051
1052                 if (discard_my_data)
1053                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1054                 else
1055                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1056
1057                 drbd_connected(mdev);
1058                 kref_put(&mdev->kref, &drbd_minor_destroy);
1059                 rcu_read_lock();
1060         }
1061         rcu_read_unlock();
1062
1063         rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1064         if (rv < SS_SUCCESS) {
1065                 clear_bit(STATE_SENT, &tconn->flags);
1066                 return 0;
1067         }
1068
1069         drbd_thread_start(&tconn->asender);
1070
1071         mutex_lock(&tconn->conf_update);
1072         /* The discard_my_data flag is a single-shot modifier to the next
1073          * connection attempt, the handshake of which is now well underway.
1074          * No need for rcu style copying of the whole struct
1075          * just to clear a single value. */
1076         tconn->net_conf->discard_my_data = 0;
1077         mutex_unlock(&tconn->conf_update);
1078
1079         return h;
1080
1081 out_release_sockets:
1082         if (ad.s_listen)
1083                 sock_release(ad.s_listen);
1084         if (sock.socket)
1085                 sock_release(sock.socket);
1086         if (msock.socket)
1087                 sock_release(msock.socket);
1088         return -1;
1089 }
1090
1091 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1092 {
1093         unsigned int header_size = drbd_header_size(tconn);
1094
1095         if (header_size == sizeof(struct p_header100) &&
1096             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1097                 struct p_header100 *h = header;
1098                 if (h->pad != 0) {
1099                         conn_err(tconn, "Header padding is not zero\n");
1100                         return -EINVAL;
1101                 }
1102                 pi->vnr = be16_to_cpu(h->volume);
1103                 pi->cmd = be16_to_cpu(h->command);
1104                 pi->size = be32_to_cpu(h->length);
1105         } else if (header_size == sizeof(struct p_header95) &&
1106                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1107                 struct p_header95 *h = header;
1108                 pi->cmd = be16_to_cpu(h->command);
1109                 pi->size = be32_to_cpu(h->length);
1110                 pi->vnr = 0;
1111         } else if (header_size == sizeof(struct p_header80) &&
1112                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1113                 struct p_header80 *h = header;
1114                 pi->cmd = be16_to_cpu(h->command);
1115                 pi->size = be16_to_cpu(h->length);
1116                 pi->vnr = 0;
1117         } else {
1118                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1119                          be32_to_cpu(*(__be32 *)header),
1120                          tconn->agreed_pro_version);
1121                 return -EINVAL;
1122         }
1123         pi->data = header + header_size;
1124         return 0;
1125 }
1126
1127 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1128 {
1129         void *buffer = tconn->data.rbuf;
1130         int err;
1131
1132         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1133         if (err)
1134                 return err;
1135
1136         err = decode_header(tconn, buffer, pi);
1137         tconn->last_received = jiffies;
1138
1139         return err;
1140 }
1141
1142 static void drbd_flush(struct drbd_tconn *tconn)
1143 {
1144         int rv;
1145         struct drbd_conf *mdev;
1146         int vnr;
1147
1148         if (tconn->write_ordering >= WO_bdev_flush) {
1149                 rcu_read_lock();
1150                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1151                         if (!get_ldev(mdev))
1152                                 continue;
1153                         kref_get(&mdev->kref);
1154                         rcu_read_unlock();
1155
1156                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1157                                         GFP_NOIO, NULL);
1158                         if (rv) {
1159                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1160                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1161                                  * don't try again for ANY return value != 0
1162                                  * if (rv == -EOPNOTSUPP) */
1163                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1164                         }
1165                         put_ldev(mdev);
1166                         kref_put(&mdev->kref, &drbd_minor_destroy);
1167
1168                         rcu_read_lock();
1169                         if (rv)
1170                                 break;
1171                 }
1172                 rcu_read_unlock();
1173         }
1174 }
1175
1176 /**
1177  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1178  * @mdev:       DRBD device.
1179  * @epoch:      Epoch object.
1180  * @ev:         Epoch event.
1181  */
1182 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1183                                                struct drbd_epoch *epoch,
1184                                                enum epoch_event ev)
1185 {
1186         int epoch_size;
1187         struct drbd_epoch *next_epoch;
1188         enum finish_epoch rv = FE_STILL_LIVE;
1189
1190         spin_lock(&tconn->epoch_lock);
1191         do {
1192                 next_epoch = NULL;
1193
1194                 epoch_size = atomic_read(&epoch->epoch_size);
1195
1196                 switch (ev & ~EV_CLEANUP) {
1197                 case EV_PUT:
1198                         atomic_dec(&epoch->active);
1199                         break;
1200                 case EV_GOT_BARRIER_NR:
1201                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1202                         break;
1203                 case EV_BECAME_LAST:
1204                         /* nothing to do*/
1205                         break;
1206                 }
1207
1208                 if (epoch_size != 0 &&
1209                     atomic_read(&epoch->active) == 0 &&
1210                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1211                         if (!(ev & EV_CLEANUP)) {
1212                                 spin_unlock(&tconn->epoch_lock);
1213                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1214                                 spin_lock(&tconn->epoch_lock);
1215                         }
1216 #if 0
1217                         /* FIXME: dec unacked on connection, once we have
1218                          * something to count pending connection packets in. */
1219                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1220                                 dec_unacked(epoch->tconn);
1221 #endif
1222
1223                         if (tconn->current_epoch != epoch) {
1224                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1225                                 list_del(&epoch->list);
1226                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1227                                 tconn->epochs--;
1228                                 kfree(epoch);
1229
1230                                 if (rv == FE_STILL_LIVE)
1231                                         rv = FE_DESTROYED;
1232                         } else {
1233                                 epoch->flags = 0;
1234                                 atomic_set(&epoch->epoch_size, 0);
1235                                 /* atomic_set(&epoch->active, 0); is already zero */
1236                                 if (rv == FE_STILL_LIVE)
1237                                         rv = FE_RECYCLED;
1238                         }
1239                 }
1240
1241                 if (!next_epoch)
1242                         break;
1243
1244                 epoch = next_epoch;
1245         } while (1);
1246
1247         spin_unlock(&tconn->epoch_lock);
1248
1249         return rv;
1250 }
1251
1252 /**
1253  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1254  * @tconn:      DRBD connection.
1255  * @wo:         Write ordering method to try.
1256  */
1257 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1258 {
1259         struct disk_conf *dc;
1260         struct drbd_conf *mdev;
1261         enum write_ordering_e pwo;
1262         int vnr;
1263         static char *write_ordering_str[] = {
1264                 [WO_none] = "none",
1265                 [WO_drain_io] = "drain",
1266                 [WO_bdev_flush] = "flush",
1267         };
1268
1269         pwo = tconn->write_ordering;
1270         wo = min(pwo, wo);
1271         rcu_read_lock();
1272         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1273                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1274                         continue;
1275                 dc = rcu_dereference(mdev->ldev->disk_conf);
1276
1277                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1278                         wo = WO_drain_io;
1279                 if (wo == WO_drain_io && !dc->disk_drain)
1280                         wo = WO_none;
1281                 put_ldev(mdev);
1282         }
1283         rcu_read_unlock();
1284         tconn->write_ordering = wo;
1285         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1286                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1287 }
1288
1289 /**
1290  * drbd_submit_peer_request()
1291  * @mdev:       DRBD device.
1292  * @peer_req:   peer request
1293  * @rw:         flag field, see bio->bi_rw
1294  *
1295  * May spread the pages to multiple bios,
1296  * depending on bio_add_page restrictions.
1297  *
1298  * Returns 0 if all bios have been submitted,
1299  * -ENOMEM if we could not allocate enough bios,
1300  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1301  *  single page to an empty bio (which should never happen and likely indicates
1302  *  that the lower level IO stack is in some way broken). This has been observed
1303  *  on certain Xen deployments.
1304  */
1305 /* TODO allocate from our own bio_set. */
1306 int drbd_submit_peer_request(struct drbd_conf *mdev,
1307                              struct drbd_peer_request *peer_req,
1308                              const unsigned rw, const int fault_type)
1309 {
1310         struct bio *bios = NULL;
1311         struct bio *bio;
1312         struct page *page = peer_req->pages;
1313         sector_t sector = peer_req->i.sector;
1314         unsigned ds = peer_req->i.size;
1315         unsigned n_bios = 0;
1316         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1317         int err = -ENOMEM;
1318
1319         /* In most cases, we will only need one bio.  But in case the lower
1320          * level restrictions happen to be different at this offset on this
1321          * side than those of the sending peer, we may need to submit the
1322          * request in more than one bio.
1323          *
1324          * Plain bio_alloc is good enough here, this is no DRBD internally
1325          * generated bio, but a bio allocated on behalf of the peer.
1326          */
1327 next_bio:
1328         bio = bio_alloc(GFP_NOIO, nr_pages);
1329         if (!bio) {
1330                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1331                 goto fail;
1332         }
1333         /* > peer_req->i.sector, unless this is the first bio */
1334         bio->bi_sector = sector;
1335         bio->bi_bdev = mdev->ldev->backing_bdev;
1336         bio->bi_rw = rw;
1337         bio->bi_private = peer_req;
1338         bio->bi_end_io = drbd_peer_request_endio;
1339
1340         bio->bi_next = bios;
1341         bios = bio;
1342         ++n_bios;
1343
1344         page_chain_for_each(page) {
1345                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1346                 if (!bio_add_page(bio, page, len, 0)) {
1347                         /* A single page must always be possible!
1348                          * But in case it fails anyways,
1349                          * we deal with it, and complain (below). */
1350                         if (bio->bi_vcnt == 0) {
1351                                 dev_err(DEV,
1352                                         "bio_add_page failed for len=%u, "
1353                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1354                                         len, (unsigned long long)bio->bi_sector);
1355                                 err = -ENOSPC;
1356                                 goto fail;
1357                         }
1358                         goto next_bio;
1359                 }
1360                 ds -= len;
1361                 sector += len >> 9;
1362                 --nr_pages;
1363         }
1364         D_ASSERT(page == NULL);
1365         D_ASSERT(ds == 0);
1366
1367         atomic_set(&peer_req->pending_bios, n_bios);
1368         do {
1369                 bio = bios;
1370                 bios = bios->bi_next;
1371                 bio->bi_next = NULL;
1372
1373                 drbd_generic_make_request(mdev, fault_type, bio);
1374         } while (bios);
1375         return 0;
1376
1377 fail:
1378         while (bios) {
1379                 bio = bios;
1380                 bios = bios->bi_next;
1381                 bio_put(bio);
1382         }
1383         return err;
1384 }
1385
1386 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1387                                              struct drbd_peer_request *peer_req)
1388 {
1389         struct drbd_interval *i = &peer_req->i;
1390
1391         drbd_remove_interval(&mdev->write_requests, i);
1392         drbd_clear_interval(i);
1393
1394         /* Wake up any processes waiting for this peer request to complete.  */
1395         if (i->waiting)
1396                 wake_up(&mdev->misc_wait);
1397 }
1398
1399 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1400 {
1401         struct drbd_conf *mdev;
1402         int vnr;
1403
1404         rcu_read_lock();
1405         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1406                 kref_get(&mdev->kref);
1407                 rcu_read_unlock();
1408                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1409                 kref_put(&mdev->kref, &drbd_minor_destroy);
1410                 rcu_read_lock();
1411         }
1412         rcu_read_unlock();
1413 }
1414
1415 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1416 {
1417         int rv;
1418         struct p_barrier *p = pi->data;
1419         struct drbd_epoch *epoch;
1420
1421         /* FIXME these are unacked on connection,
1422          * not a specific (peer)device.
1423          */
1424         tconn->current_epoch->barrier_nr = p->barrier;
1425         tconn->current_epoch->tconn = tconn;
1426         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1427
1428         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1429          * the activity log, which means it would not be resynced in case the
1430          * R_PRIMARY crashes now.
1431          * Therefore we must send the barrier_ack after the barrier request was
1432          * completed. */
1433         switch (tconn->write_ordering) {
1434         case WO_none:
1435                 if (rv == FE_RECYCLED)
1436                         return 0;
1437
1438                 /* receiver context, in the writeout path of the other node.
1439                  * avoid potential distributed deadlock */
1440                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1441                 if (epoch)
1442                         break;
1443                 else
1444                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1445                         /* Fall through */
1446
1447         case WO_bdev_flush:
1448         case WO_drain_io:
1449                 conn_wait_active_ee_empty(tconn);
1450                 drbd_flush(tconn);
1451
1452                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1453                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1454                         if (epoch)
1455                                 break;
1456                 }
1457
1458                 return 0;
1459         default:
1460                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1461                 return -EIO;
1462         }
1463
1464         epoch->flags = 0;
1465         atomic_set(&epoch->epoch_size, 0);
1466         atomic_set(&epoch->active, 0);
1467
1468         spin_lock(&tconn->epoch_lock);
1469         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1470                 list_add(&epoch->list, &tconn->current_epoch->list);
1471                 tconn->current_epoch = epoch;
1472                 tconn->epochs++;
1473         } else {
1474                 /* The current_epoch got recycled while we allocated this one... */
1475                 kfree(epoch);
1476         }
1477         spin_unlock(&tconn->epoch_lock);
1478
1479         return 0;
1480 }
1481
1482 /* used from receive_RSDataReply (recv_resync_read)
1483  * and from receive_Data */
1484 static struct drbd_peer_request *
1485 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1486               int data_size) __must_hold(local)
1487 {
1488         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1489         struct drbd_peer_request *peer_req;
1490         struct page *page;
1491         int dgs, ds, err;
1492         void *dig_in = mdev->tconn->int_dig_in;
1493         void *dig_vv = mdev->tconn->int_dig_vv;
1494         unsigned long *data;
1495
1496         dgs = 0;
1497         if (mdev->tconn->peer_integrity_tfm) {
1498                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1499                 /*
1500                  * FIXME: Receive the incoming digest into the receive buffer
1501                  *        here, together with its struct p_data?
1502                  */
1503                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1504                 if (err)
1505                         return NULL;
1506                 data_size -= dgs;
1507         }
1508
1509         if (!expect(IS_ALIGNED(data_size, 512)))
1510                 return NULL;
1511         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1512                 return NULL;
1513
1514         /* even though we trust out peer,
1515          * we sometimes have to double check. */
1516         if (sector + (data_size>>9) > capacity) {
1517                 dev_err(DEV, "request from peer beyond end of local disk: "
1518                         "capacity: %llus < sector: %llus + size: %u\n",
1519                         (unsigned long long)capacity,
1520                         (unsigned long long)sector, data_size);
1521                 return NULL;
1522         }
1523
1524         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1525          * "criss-cross" setup, that might cause write-out on some other DRBD,
1526          * which in turn might block on the other node at this very place.  */
1527         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1528         if (!peer_req)
1529                 return NULL;
1530
1531         if (!data_size)
1532                 return peer_req;
1533
1534         ds = data_size;
1535         page = peer_req->pages;
1536         page_chain_for_each(page) {
1537                 unsigned len = min_t(int, ds, PAGE_SIZE);
1538                 data = kmap(page);
1539                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1540                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1541                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1542                         data[0] = data[0] ^ (unsigned long)-1;
1543                 }
1544                 kunmap(page);
1545                 if (err) {
1546                         drbd_free_peer_req(mdev, peer_req);
1547                         return NULL;
1548                 }
1549                 ds -= len;
1550         }
1551
1552         if (dgs) {
1553                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1554                 if (memcmp(dig_in, dig_vv, dgs)) {
1555                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1556                                 (unsigned long long)sector, data_size);
1557                         drbd_free_peer_req(mdev, peer_req);
1558                         return NULL;
1559                 }
1560         }
1561         mdev->recv_cnt += data_size>>9;
1562         return peer_req;
1563 }
1564
1565 /* drbd_drain_block() just takes a data block
1566  * out of the socket input buffer, and discards it.
1567  */
1568 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1569 {
1570         struct page *page;
1571         int err = 0;
1572         void *data;
1573
1574         if (!data_size)
1575                 return 0;
1576
1577         page = drbd_alloc_pages(mdev, 1, 1);
1578
1579         data = kmap(page);
1580         while (data_size) {
1581                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1582
1583                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1584                 if (err)
1585                         break;
1586                 data_size -= len;
1587         }
1588         kunmap(page);
1589         drbd_free_pages(mdev, page, 0);
1590         return err;
1591 }
1592
1593 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1594                            sector_t sector, int data_size)
1595 {
1596         struct bio_vec *bvec;
1597         struct bio *bio;
1598         int dgs, err, i, expect;
1599         void *dig_in = mdev->tconn->int_dig_in;
1600         void *dig_vv = mdev->tconn->int_dig_vv;
1601
1602         dgs = 0;
1603         if (mdev->tconn->peer_integrity_tfm) {
1604                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1605                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1606                 if (err)
1607                         return err;
1608                 data_size -= dgs;
1609         }
1610
1611         /* optimistically update recv_cnt.  if receiving fails below,
1612          * we disconnect anyways, and counters will be reset. */
1613         mdev->recv_cnt += data_size>>9;
1614
1615         bio = req->master_bio;
1616         D_ASSERT(sector == bio->bi_sector);
1617
1618         bio_for_each_segment(bvec, bio, i) {
1619                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1620                 expect = min_t(int, data_size, bvec->bv_len);
1621                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1622                 kunmap(bvec->bv_page);
1623                 if (err)
1624                         return err;
1625                 data_size -= expect;
1626         }
1627
1628         if (dgs) {
1629                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1630                 if (memcmp(dig_in, dig_vv, dgs)) {
1631                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1632                         return -EINVAL;
1633                 }
1634         }
1635
1636         D_ASSERT(data_size == 0);
1637         return 0;
1638 }
1639
1640 /*
1641  * e_end_resync_block() is called in asender context via
1642  * drbd_finish_peer_reqs().
1643  */
1644 static int e_end_resync_block(struct drbd_work *w, int unused)
1645 {
1646         struct drbd_peer_request *peer_req =
1647                 container_of(w, struct drbd_peer_request, w);
1648         struct drbd_conf *mdev = w->mdev;
1649         sector_t sector = peer_req->i.sector;
1650         int err;
1651
1652         D_ASSERT(drbd_interval_empty(&peer_req->i));
1653
1654         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1655                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1656                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1657         } else {
1658                 /* Record failure to sync */
1659                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1660
1661                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1662         }
1663         dec_unacked(mdev);
1664
1665         return err;
1666 }
1667
1668 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1669 {
1670         struct drbd_peer_request *peer_req;
1671
1672         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1673         if (!peer_req)
1674                 goto fail;
1675
1676         dec_rs_pending(mdev);
1677
1678         inc_unacked(mdev);
1679         /* corresponding dec_unacked() in e_end_resync_block()
1680          * respective _drbd_clear_done_ee */
1681
1682         peer_req->w.cb = e_end_resync_block;
1683
1684         spin_lock_irq(&mdev->tconn->req_lock);
1685         list_add(&peer_req->w.list, &mdev->sync_ee);
1686         spin_unlock_irq(&mdev->tconn->req_lock);
1687
1688         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1689         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1690                 return 0;
1691
1692         /* don't care for the reason here */
1693         dev_err(DEV, "submit failed, triggering re-connect\n");
1694         spin_lock_irq(&mdev->tconn->req_lock);
1695         list_del(&peer_req->w.list);
1696         spin_unlock_irq(&mdev->tconn->req_lock);
1697
1698         drbd_free_peer_req(mdev, peer_req);
1699 fail:
1700         put_ldev(mdev);
1701         return -EIO;
1702 }
1703
1704 static struct drbd_request *
1705 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1706              sector_t sector, bool missing_ok, const char *func)
1707 {
1708         struct drbd_request *req;
1709
1710         /* Request object according to our peer */
1711         req = (struct drbd_request *)(unsigned long)id;
1712         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1713                 return req;
1714         if (!missing_ok) {
1715                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1716                         (unsigned long)id, (unsigned long long)sector);
1717         }
1718         return NULL;
1719 }
1720
1721 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1722 {
1723         struct drbd_conf *mdev;
1724         struct drbd_request *req;
1725         sector_t sector;
1726         int err;
1727         struct p_data *p = pi->data;
1728
1729         mdev = vnr_to_mdev(tconn, pi->vnr);
1730         if (!mdev)
1731                 return -EIO;
1732
1733         sector = be64_to_cpu(p->sector);
1734
1735         spin_lock_irq(&mdev->tconn->req_lock);
1736         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1737         spin_unlock_irq(&mdev->tconn->req_lock);
1738         if (unlikely(!req))
1739                 return -EIO;
1740
1741         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1742          * special casing it there for the various failure cases.
1743          * still no race with drbd_fail_pending_reads */
1744         err = recv_dless_read(mdev, req, sector, pi->size);
1745         if (!err)
1746                 req_mod(req, DATA_RECEIVED);
1747         /* else: nothing. handled from drbd_disconnect...
1748          * I don't think we may complete this just yet
1749          * in case we are "on-disconnect: freeze" */
1750
1751         return err;
1752 }
1753
1754 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1755 {
1756         struct drbd_conf *mdev;
1757         sector_t sector;
1758         int err;
1759         struct p_data *p = pi->data;
1760
1761         mdev = vnr_to_mdev(tconn, pi->vnr);
1762         if (!mdev)
1763                 return -EIO;
1764
1765         sector = be64_to_cpu(p->sector);
1766         D_ASSERT(p->block_id == ID_SYNCER);
1767
1768         if (get_ldev(mdev)) {
1769                 /* data is submitted to disk within recv_resync_read.
1770                  * corresponding put_ldev done below on error,
1771                  * or in drbd_peer_request_endio. */
1772                 err = recv_resync_read(mdev, sector, pi->size);
1773         } else {
1774                 if (__ratelimit(&drbd_ratelimit_state))
1775                         dev_err(DEV, "Can not write resync data to local disk.\n");
1776
1777                 err = drbd_drain_block(mdev, pi->size);
1778
1779                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1780         }
1781
1782         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1783
1784         return err;
1785 }
1786
1787 static void restart_conflicting_writes(struct drbd_conf *mdev,
1788                                        sector_t sector, int size)
1789 {
1790         struct drbd_interval *i;
1791         struct drbd_request *req;
1792
1793         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1794                 if (!i->local)
1795                         continue;
1796                 req = container_of(i, struct drbd_request, i);
1797                 if (req->rq_state & RQ_LOCAL_PENDING ||
1798                     !(req->rq_state & RQ_POSTPONED))
1799                         continue;
1800                 /* as it is RQ_POSTPONED, this will cause it to
1801                  * be queued on the retry workqueue. */
1802                 __req_mod(req, DISCARD_WRITE, NULL);
1803         }
1804 }
1805
1806 /*
1807  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1808  */
1809 static int e_end_block(struct drbd_work *w, int cancel)
1810 {
1811         struct drbd_peer_request *peer_req =
1812                 container_of(w, struct drbd_peer_request, w);
1813         struct drbd_conf *mdev = w->mdev;
1814         sector_t sector = peer_req->i.sector;
1815         int err = 0, pcmd;
1816
1817         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1818                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1819                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1820                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1821                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1822                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1823                         err = drbd_send_ack(mdev, pcmd, peer_req);
1824                         if (pcmd == P_RS_WRITE_ACK)
1825                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1826                 } else {
1827                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1828                         /* we expect it to be marked out of sync anyways...
1829                          * maybe assert this?  */
1830                 }
1831                 dec_unacked(mdev);
1832         }
1833         /* we delete from the conflict detection hash _after_ we sent out the
1834          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1835         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1836                 spin_lock_irq(&mdev->tconn->req_lock);
1837                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1838                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1839                 if (peer_req->flags & EE_RESTART_REQUESTS)
1840                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1841                 spin_unlock_irq(&mdev->tconn->req_lock);
1842         } else
1843                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1844
1845         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1846
1847         return err;
1848 }
1849
1850 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1851 {
1852         struct drbd_conf *mdev = w->mdev;
1853         struct drbd_peer_request *peer_req =
1854                 container_of(w, struct drbd_peer_request, w);
1855         int err;
1856
1857         err = drbd_send_ack(mdev, ack, peer_req);
1858         dec_unacked(mdev);
1859
1860         return err;
1861 }
1862
1863 static int e_send_discard_write(struct drbd_work *w, int unused)
1864 {
1865         return e_send_ack(w, P_DISCARD_WRITE);
1866 }
1867
1868 static int e_send_retry_write(struct drbd_work *w, int unused)
1869 {
1870         struct drbd_tconn *tconn = w->mdev->tconn;
1871
1872         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1873                              P_RETRY_WRITE : P_DISCARD_WRITE);
1874 }
1875
1876 static bool seq_greater(u32 a, u32 b)
1877 {
1878         /*
1879          * We assume 32-bit wrap-around here.
1880          * For 24-bit wrap-around, we would have to shift:
1881          *  a <<= 8; b <<= 8;
1882          */
1883         return (s32)a - (s32)b > 0;
1884 }
1885
1886 static u32 seq_max(u32 a, u32 b)
1887 {
1888         return seq_greater(a, b) ? a : b;
1889 }
1890
1891 static bool need_peer_seq(struct drbd_conf *mdev)
1892 {
1893         struct drbd_tconn *tconn = mdev->tconn;
1894         int tp;
1895
1896         /*
1897          * We only need to keep track of the last packet_seq number of our peer
1898          * if we are in dual-primary mode and we have the discard flag set; see
1899          * handle_write_conflicts().
1900          */
1901
1902         rcu_read_lock();
1903         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1904         rcu_read_unlock();
1905
1906         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1907 }
1908
1909 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1910 {
1911         unsigned int newest_peer_seq;
1912
1913         if (need_peer_seq(mdev)) {
1914                 spin_lock(&mdev->peer_seq_lock);
1915                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1916                 mdev->peer_seq = newest_peer_seq;
1917                 spin_unlock(&mdev->peer_seq_lock);
1918                 /* wake up only if we actually changed mdev->peer_seq */
1919                 if (peer_seq == newest_peer_seq)
1920                         wake_up(&mdev->seq_wait);
1921         }
1922 }
1923
1924 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1925 {
1926         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1927 }
1928
1929 /* maybe change sync_ee into interval trees as well? */
1930 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1931 {
1932         struct drbd_peer_request *rs_req;
1933         bool rv = 0;
1934
1935         spin_lock_irq(&mdev->tconn->req_lock);
1936         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1937                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1938                              rs_req->i.sector, rs_req->i.size)) {
1939                         rv = 1;
1940                         break;
1941                 }
1942         }
1943         spin_unlock_irq(&mdev->tconn->req_lock);
1944
1945         return rv;
1946 }
1947
1948 /* Called from receive_Data.
1949  * Synchronize packets on sock with packets on msock.
1950  *
1951  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1952  * packet traveling on msock, they are still processed in the order they have
1953  * been sent.
1954  *
1955  * Note: we don't care for Ack packets overtaking P_DATA packets.
1956  *
1957  * In case packet_seq is larger than mdev->peer_seq number, there are
1958  * outstanding packets on the msock. We wait for them to arrive.
1959  * In case we are the logically next packet, we update mdev->peer_seq
1960  * ourselves. Correctly handles 32bit wrap around.
1961  *
1962  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1963  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1964  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1965  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1966  *
1967  * returns 0 if we may process the packet,
1968  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1969 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1970 {
1971         DEFINE_WAIT(wait);
1972         long timeout;
1973         int ret;
1974
1975         if (!need_peer_seq(mdev))
1976                 return 0;
1977
1978         spin_lock(&mdev->peer_seq_lock);
1979         for (;;) {
1980                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1981                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1982                         ret = 0;
1983                         break;
1984                 }
1985                 if (signal_pending(current)) {
1986                         ret = -ERESTARTSYS;
1987                         break;
1988                 }
1989                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1990                 spin_unlock(&mdev->peer_seq_lock);
1991                 rcu_read_lock();
1992                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1993                 rcu_read_unlock();
1994                 timeout = schedule_timeout(timeout);
1995                 spin_lock(&mdev->peer_seq_lock);
1996                 if (!timeout) {
1997                         ret = -ETIMEDOUT;
1998                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1999                         break;
2000                 }
2001         }
2002         spin_unlock(&mdev->peer_seq_lock);
2003         finish_wait(&mdev->seq_wait, &wait);
2004         return ret;
2005 }
2006
2007 /* see also bio_flags_to_wire()
2008  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2009  * flags and back. We may replicate to other kernel versions. */
2010 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
2011 {
2012         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2013                 (dpf & DP_FUA ? REQ_FUA : 0) |
2014                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2015                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2016 }
2017
2018 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2019                                     unsigned int size)
2020 {
2021         struct drbd_interval *i;
2022
2023     repeat:
2024         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2025                 struct drbd_request *req;
2026                 struct bio_and_error m;
2027
2028                 if (!i->local)
2029                         continue;
2030                 req = container_of(i, struct drbd_request, i);
2031                 if (!(req->rq_state & RQ_POSTPONED))
2032                         continue;
2033                 req->rq_state &= ~RQ_POSTPONED;
2034                 __req_mod(req, NEG_ACKED, &m);
2035                 spin_unlock_irq(&mdev->tconn->req_lock);
2036                 if (m.bio)
2037                         complete_master_bio(mdev, &m);
2038                 spin_lock_irq(&mdev->tconn->req_lock);
2039                 goto repeat;
2040         }
2041 }
2042
2043 static int handle_write_conflicts(struct drbd_conf *mdev,
2044                                   struct drbd_peer_request *peer_req)
2045 {
2046         struct drbd_tconn *tconn = mdev->tconn;
2047         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
2048         sector_t sector = peer_req->i.sector;
2049         const unsigned int size = peer_req->i.size;
2050         struct drbd_interval *i;
2051         bool equal;
2052         int err;
2053
2054         /*
2055          * Inserting the peer request into the write_requests tree will prevent
2056          * new conflicting local requests from being added.
2057          */
2058         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2059
2060     repeat:
2061         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2062                 if (i == &peer_req->i)
2063                         continue;
2064
2065                 if (!i->local) {
2066                         /*
2067                          * Our peer has sent a conflicting remote request; this
2068                          * should not happen in a two-node setup.  Wait for the
2069                          * earlier peer request to complete.
2070                          */
2071                         err = drbd_wait_misc(mdev, i);
2072                         if (err)
2073                                 goto out;
2074                         goto repeat;
2075                 }
2076
2077                 equal = i->sector == sector && i->size == size;
2078                 if (resolve_conflicts) {
2079                         /*
2080                          * If the peer request is fully contained within the
2081                          * overlapping request, it can be discarded; otherwise,
2082                          * it will be retried once all overlapping requests
2083                          * have completed.
2084                          */
2085                         bool discard = i->sector <= sector && i->sector +
2086                                        (i->size >> 9) >= sector + (size >> 9);
2087
2088                         if (!equal)
2089                                 dev_alert(DEV, "Concurrent writes detected: "
2090                                                "local=%llus +%u, remote=%llus +%u, "
2091                                                "assuming %s came first\n",
2092                                           (unsigned long long)i->sector, i->size,
2093                                           (unsigned long long)sector, size,
2094                                           discard ? "local" : "remote");
2095
2096                         inc_unacked(mdev);
2097                         peer_req->w.cb = discard ? e_send_discard_write :
2098                                                    e_send_retry_write;
2099                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2100                         wake_asender(mdev->tconn);
2101
2102                         err = -ENOENT;
2103                         goto out;
2104                 } else {
2105                         struct drbd_request *req =
2106                                 container_of(i, struct drbd_request, i);
2107
2108                         if (!equal)
2109                                 dev_alert(DEV, "Concurrent writes detected: "
2110                                                "local=%llus +%u, remote=%llus +%u\n",
2111                                           (unsigned long long)i->sector, i->size,
2112                                           (unsigned long long)sector, size);
2113
2114                         if (req->rq_state & RQ_LOCAL_PENDING ||
2115                             !(req->rq_state & RQ_POSTPONED)) {
2116                                 /*
2117                                  * Wait for the node with the discard flag to
2118                                  * decide if this request will be discarded or
2119                                  * retried.  Requests that are discarded will
2120                                  * disappear from the write_requests tree.
2121                                  *
2122                                  * In addition, wait for the conflicting
2123                                  * request to finish locally before submitting
2124                                  * the conflicting peer request.
2125                                  */
2126                                 err = drbd_wait_misc(mdev, &req->i);
2127                                 if (err) {
2128                                         _conn_request_state(mdev->tconn,
2129                                                             NS(conn, C_TIMEOUT),
2130                                                             CS_HARD);
2131                                         fail_postponed_requests(mdev, sector, size);
2132                                         goto out;
2133                                 }
2134                                 goto repeat;
2135                         }
2136                         /*
2137                          * Remember to restart the conflicting requests after
2138                          * the new peer request has completed.
2139                          */
2140                         peer_req->flags |= EE_RESTART_REQUESTS;
2141                 }
2142         }
2143         err = 0;
2144
2145     out:
2146         if (err)
2147                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2148         return err;
2149 }
2150
2151 /* mirrored write */
2152 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2153 {
2154         struct drbd_conf *mdev;
2155         sector_t sector;
2156         struct drbd_peer_request *peer_req;
2157         struct p_data *p = pi->data;
2158         u32 peer_seq = be32_to_cpu(p->seq_num);
2159         int rw = WRITE;
2160         u32 dp_flags;
2161         int err, tp;
2162
2163         mdev = vnr_to_mdev(tconn, pi->vnr);
2164         if (!mdev)
2165                 return -EIO;
2166
2167         if (!get_ldev(mdev)) {
2168                 int err2;
2169
2170                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2171                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2172                 atomic_inc(&tconn->current_epoch->epoch_size);
2173                 err2 = drbd_drain_block(mdev, pi->size);
2174                 if (!err)
2175                         err = err2;
2176                 return err;
2177         }
2178
2179         /*
2180          * Corresponding put_ldev done either below (on various errors), or in
2181          * drbd_peer_request_endio, if we successfully submit the data at the
2182          * end of this function.
2183          */
2184
2185         sector = be64_to_cpu(p->sector);
2186         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2187         if (!peer_req) {
2188                 put_ldev(mdev);
2189                 return -EIO;
2190         }
2191
2192         peer_req->w.cb = e_end_block;
2193
2194         dp_flags = be32_to_cpu(p->dp_flags);
2195         rw |= wire_flags_to_bio(mdev, dp_flags);
2196         if (peer_req->pages == NULL) {
2197                 D_ASSERT(peer_req->i.size == 0);
2198                 D_ASSERT(dp_flags & DP_FLUSH);
2199         }
2200
2201         if (dp_flags & DP_MAY_SET_IN_SYNC)
2202                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2203
2204         spin_lock(&tconn->epoch_lock);
2205         peer_req->epoch = tconn->current_epoch;
2206         atomic_inc(&peer_req->epoch->epoch_size);
2207         atomic_inc(&peer_req->epoch->active);
2208         spin_unlock(&tconn->epoch_lock);
2209
2210         rcu_read_lock();
2211         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2212         rcu_read_unlock();
2213         if (tp) {
2214                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2215                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2216                 if (err)
2217                         goto out_interrupted;
2218                 spin_lock_irq(&mdev->tconn->req_lock);
2219                 err = handle_write_conflicts(mdev, peer_req);
2220                 if (err) {
2221                         spin_unlock_irq(&mdev->tconn->req_lock);
2222                         if (err == -ENOENT) {
2223                                 put_ldev(mdev);
2224                                 return 0;
2225                         }
2226                         goto out_interrupted;
2227                 }
2228         } else
2229                 spin_lock_irq(&mdev->tconn->req_lock);
2230         list_add(&peer_req->w.list, &mdev->active_ee);
2231         spin_unlock_irq(&mdev->tconn->req_lock);
2232
2233         if (mdev->state.conn == C_SYNC_TARGET)
2234                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2235
2236         if (mdev->tconn->agreed_pro_version < 100) {
2237                 rcu_read_lock();
2238                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2239                 case DRBD_PROT_C:
2240                         dp_flags |= DP_SEND_WRITE_ACK;
2241                         break;
2242                 case DRBD_PROT_B:
2243                         dp_flags |= DP_SEND_RECEIVE_ACK;
2244                         break;
2245                 }
2246                 rcu_read_unlock();
2247         }
2248
2249         if (dp_flags & DP_SEND_WRITE_ACK) {
2250                 peer_req->flags |= EE_SEND_WRITE_ACK;
2251                 inc_unacked(mdev);
2252                 /* corresponding dec_unacked() in e_end_block()
2253                  * respective _drbd_clear_done_ee */
2254         }
2255
2256         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2257                 /* I really don't like it that the receiver thread
2258                  * sends on the msock, but anyways */
2259                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2260         }
2261
2262         if (mdev->state.pdsk < D_INCONSISTENT) {
2263                 /* In case we have the only disk of the cluster, */
2264                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2265                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2266                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2267                 drbd_al_begin_io(mdev, &peer_req->i);
2268         }
2269
2270         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2271         if (!err)
2272                 return 0;
2273
2274         /* don't care for the reason here */
2275         dev_err(DEV, "submit failed, triggering re-connect\n");
2276         spin_lock_irq(&mdev->tconn->req_lock);
2277         list_del(&peer_req->w.list);
2278         drbd_remove_epoch_entry_interval(mdev, peer_req);
2279         spin_unlock_irq(&mdev->tconn->req_lock);
2280         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2281                 drbd_al_complete_io(mdev, &peer_req->i);
2282
2283 out_interrupted:
2284         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2285         put_ldev(mdev);
2286         drbd_free_peer_req(mdev, peer_req);
2287         return err;
2288 }
2289
2290 /* We may throttle resync, if the lower device seems to be busy,
2291  * and current sync rate is above c_min_rate.
2292  *
2293  * To decide whether or not the lower device is busy, we use a scheme similar
2294  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2295  * (more than 64 sectors) of activity we cannot account for with our own resync
2296  * activity, it obviously is "busy".
2297  *
2298  * The current sync rate used here uses only the most recent two step marks,
2299  * to have a short time average so we can react faster.
2300  */
2301 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2302 {
2303         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2304         unsigned long db, dt, dbdt;
2305         struct lc_element *tmp;
2306         int curr_events;
2307         int throttle = 0;
2308         unsigned int c_min_rate;
2309
2310         rcu_read_lock();
2311         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2312         rcu_read_unlock();
2313
2314         /* feature disabled? */
2315         if (c_min_rate == 0)
2316                 return 0;
2317
2318         spin_lock_irq(&mdev->al_lock);
2319         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2320         if (tmp) {
2321                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2322                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2323                         spin_unlock_irq(&mdev->al_lock);
2324                         return 0;
2325                 }
2326                 /* Do not slow down if app IO is already waiting for this extent */
2327         }
2328         spin_unlock_irq(&mdev->al_lock);
2329
2330         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2331                       (int)part_stat_read(&disk->part0, sectors[1]) -
2332                         atomic_read(&mdev->rs_sect_ev);
2333
2334         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2335                 unsigned long rs_left;
2336                 int i;
2337
2338                 mdev->rs_last_events = curr_events;
2339
2340                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2341                  * approx. */
2342                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2343
2344                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2345                         rs_left = mdev->ov_left;
2346                 else
2347                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2348
2349                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2350                 if (!dt)
2351                         dt++;
2352                 db = mdev->rs_mark_left[i] - rs_left;
2353                 dbdt = Bit2KB(db/dt);
2354
2355                 if (dbdt > c_min_rate)
2356                         throttle = 1;
2357         }
2358         return throttle;
2359 }
2360
2361
2362 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2363 {
2364         struct drbd_conf *mdev;
2365         sector_t sector;
2366         sector_t capacity;
2367         struct drbd_peer_request *peer_req;
2368         struct digest_info *di = NULL;
2369         int size, verb;
2370         unsigned int fault_type;
2371         struct p_block_req *p = pi->data;
2372
2373         mdev = vnr_to_mdev(tconn, pi->vnr);
2374         if (!mdev)
2375                 return -EIO;
2376         capacity = drbd_get_capacity(mdev->this_bdev);
2377
2378         sector = be64_to_cpu(p->sector);
2379         size   = be32_to_cpu(p->blksize);
2380
2381         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2382                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2383                                 (unsigned long long)sector, size);
2384                 return -EINVAL;
2385         }
2386         if (sector + (size>>9) > capacity) {
2387                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2388                                 (unsigned long long)sector, size);
2389                 return -EINVAL;
2390         }
2391
2392         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2393                 verb = 1;
2394                 switch (pi->cmd) {
2395                 case P_DATA_REQUEST:
2396                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2397                         break;
2398                 case P_RS_DATA_REQUEST:
2399                 case P_CSUM_RS_REQUEST:
2400                 case P_OV_REQUEST:
2401                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2402                         break;
2403                 case P_OV_REPLY:
2404                         verb = 0;
2405                         dec_rs_pending(mdev);
2406                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2407                         break;
2408                 default:
2409                         BUG();
2410                 }
2411                 if (verb && __ratelimit(&drbd_ratelimit_state))
2412                         dev_err(DEV, "Can not satisfy peer's read request, "
2413                             "no local data.\n");
2414
2415                 /* drain possibly payload */
2416                 return drbd_drain_block(mdev, pi->size);
2417         }
2418
2419         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2420          * "criss-cross" setup, that might cause write-out on some other DRBD,
2421          * which in turn might block on the other node at this very place.  */
2422         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2423         if (!peer_req) {
2424                 put_ldev(mdev);
2425                 return -ENOMEM;
2426         }
2427
2428         switch (pi->cmd) {
2429         case P_DATA_REQUEST:
2430                 peer_req->w.cb = w_e_end_data_req;
2431                 fault_type = DRBD_FAULT_DT_RD;
2432                 /* application IO, don't drbd_rs_begin_io */
2433                 goto submit;
2434
2435         case P_RS_DATA_REQUEST:
2436                 peer_req->w.cb = w_e_end_rsdata_req;
2437                 fault_type = DRBD_FAULT_RS_RD;
2438                 /* used in the sector offset progress display */
2439                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2440                 break;
2441
2442         case P_OV_REPLY:
2443         case P_CSUM_RS_REQUEST:
2444                 fault_type = DRBD_FAULT_RS_RD;
2445                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2446                 if (!di)
2447                         goto out_free_e;
2448
2449                 di->digest_size = pi->size;
2450                 di->digest = (((char *)di)+sizeof(struct digest_info));
2451
2452                 peer_req->digest = di;
2453                 peer_req->flags |= EE_HAS_DIGEST;
2454
2455                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2456                         goto out_free_e;
2457
2458                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2459                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2460                         peer_req->w.cb = w_e_end_csum_rs_req;
2461                         /* used in the sector offset progress display */
2462                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2463                 } else if (pi->cmd == P_OV_REPLY) {
2464                         /* track progress, we may need to throttle */
2465                         atomic_add(size >> 9, &mdev->rs_sect_in);
2466                         peer_req->w.cb = w_e_end_ov_reply;
2467                         dec_rs_pending(mdev);
2468                         /* drbd_rs_begin_io done when we sent this request,
2469                          * but accounting still needs to be done. */
2470                         goto submit_for_resync;
2471                 }
2472                 break;
2473
2474         case P_OV_REQUEST:
2475                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2476                     mdev->tconn->agreed_pro_version >= 90) {
2477                         unsigned long now = jiffies;
2478                         int i;
2479                         mdev->ov_start_sector = sector;
2480                         mdev->ov_position = sector;
2481                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2482                         mdev->rs_total = mdev->ov_left;
2483                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2484                                 mdev->rs_mark_left[i] = mdev->ov_left;
2485                                 mdev->rs_mark_time[i] = now;
2486                         }
2487                         dev_info(DEV, "Online Verify start sector: %llu\n",
2488                                         (unsigned long long)sector);
2489                 }
2490                 peer_req->w.cb = w_e_end_ov_req;
2491                 fault_type = DRBD_FAULT_RS_RD;
2492                 break;
2493
2494         default:
2495                 BUG();
2496         }
2497
2498         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2499          * wrt the receiver, but it is not as straightforward as it may seem.
2500          * Various places in the resync start and stop logic assume resync
2501          * requests are processed in order, requeuing this on the worker thread
2502          * introduces a bunch of new code for synchronization between threads.
2503          *
2504          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2505          * "forever", throttling after drbd_rs_begin_io will lock that extent
2506          * for application writes for the same time.  For now, just throttle
2507          * here, where the rest of the code expects the receiver to sleep for
2508          * a while, anyways.
2509          */
2510
2511         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2512          * this defers syncer requests for some time, before letting at least
2513          * on request through.  The resync controller on the receiving side
2514          * will adapt to the incoming rate accordingly.
2515          *
2516          * We cannot throttle here if remote is Primary/SyncTarget:
2517          * we would also throttle its application reads.
2518          * In that case, throttling is done on the SyncTarget only.
2519          */
2520         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2521                 schedule_timeout_uninterruptible(HZ/10);
2522         if (drbd_rs_begin_io(mdev, sector))
2523                 goto out_free_e;
2524
2525 submit_for_resync:
2526         atomic_add(size >> 9, &mdev->rs_sect_ev);
2527
2528 submit:
2529         inc_unacked(mdev);
2530         spin_lock_irq(&mdev->tconn->req_lock);
2531         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2532         spin_unlock_irq(&mdev->tconn->req_lock);
2533
2534         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2535                 return 0;
2536
2537         /* don't care for the reason here */
2538         dev_err(DEV, "submit failed, triggering re-connect\n");
2539         spin_lock_irq(&mdev->tconn->req_lock);
2540         list_del(&peer_req->w.list);
2541         spin_unlock_irq(&mdev->tconn->req_lock);
2542         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2543
2544 out_free_e:
2545         put_ldev(mdev);
2546         drbd_free_peer_req(mdev, peer_req);
2547         return -EIO;
2548 }
2549
2550 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2551 {
2552         int self, peer, rv = -100;
2553         unsigned long ch_self, ch_peer;
2554         enum drbd_after_sb_p after_sb_0p;
2555
2556         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2557         peer = mdev->p_uuid[UI_BITMAP] & 1;
2558
2559         ch_peer = mdev->p_uuid[UI_SIZE];
2560         ch_self = mdev->comm_bm_set;
2561
2562         rcu_read_lock();
2563         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2564         rcu_read_unlock();
2565         switch (after_sb_0p) {
2566         case ASB_CONSENSUS:
2567         case ASB_DISCARD_SECONDARY:
2568         case ASB_CALL_HELPER:
2569         case ASB_VIOLENTLY:
2570                 dev_err(DEV, "Configuration error.\n");
2571                 break;
2572         case ASB_DISCONNECT:
2573                 break;
2574         case ASB_DISCARD_YOUNGER_PRI:
2575                 if (self == 0 && peer == 1) {
2576                         rv = -1;
2577                         break;
2578                 }
2579                 if (self == 1 && peer == 0) {
2580                         rv =  1;
2581                         break;
2582                 }
2583                 /* Else fall through to one of the other strategies... */
2584         case ASB_DISCARD_OLDER_PRI:
2585                 if (self == 0 && peer == 1) {
2586                         rv = 1;
2587                         break;
2588                 }
2589                 if (self == 1 && peer == 0) {
2590                         rv = -1;
2591                         break;
2592                 }
2593                 /* Else fall through to one of the other strategies... */
2594                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2595                      "Using discard-least-changes instead\n");
2596         case ASB_DISCARD_ZERO_CHG:
2597                 if (ch_peer == 0 && ch_self == 0) {
2598                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2599                                 ? -1 : 1;
2600                         break;
2601                 } else {
2602                         if (ch_peer == 0) { rv =  1; break; }
2603                         if (ch_self == 0) { rv = -1; break; }
2604                 }
2605                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2606                         break;
2607         case ASB_DISCARD_LEAST_CHG:
2608                 if      (ch_self < ch_peer)
2609                         rv = -1;
2610                 else if (ch_self > ch_peer)
2611                         rv =  1;
2612                 else /* ( ch_self == ch_peer ) */
2613                      /* Well, then use something else. */
2614                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2615                                 ? -1 : 1;
2616                 break;
2617         case ASB_DISCARD_LOCAL:
2618                 rv = -1;
2619                 break;
2620         case ASB_DISCARD_REMOTE:
2621                 rv =  1;
2622         }
2623
2624         return rv;
2625 }
2626
2627 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2628 {
2629         int hg, rv = -100;
2630         enum drbd_after_sb_p after_sb_1p;
2631
2632         rcu_read_lock();
2633         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2634         rcu_read_unlock();
2635         switch (after_sb_1p) {
2636         case ASB_DISCARD_YOUNGER_PRI:
2637         case ASB_DISCARD_OLDER_PRI:
2638         case ASB_DISCARD_LEAST_CHG:
2639         case ASB_DISCARD_LOCAL:
2640         case ASB_DISCARD_REMOTE:
2641         case ASB_DISCARD_ZERO_CHG:
2642                 dev_err(DEV, "Configuration error.\n");
2643                 break;
2644         case ASB_DISCONNECT:
2645                 break;
2646         case ASB_CONSENSUS:
2647                 hg = drbd_asb_recover_0p(mdev);
2648                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2649                         rv = hg;
2650                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2651                         rv = hg;
2652                 break;
2653         case ASB_VIOLENTLY:
2654                 rv = drbd_asb_recover_0p(mdev);
2655                 break;
2656         case ASB_DISCARD_SECONDARY:
2657                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2658         case ASB_CALL_HELPER:
2659                 hg = drbd_asb_recover_0p(mdev);
2660                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2661                         enum drbd_state_rv rv2;
2662
2663                         drbd_set_role(mdev, R_SECONDARY, 0);
2664                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2665                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2666                           * we do not need to wait for the after state change work either. */
2667                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2668                         if (rv2 != SS_SUCCESS) {
2669                                 drbd_khelper(mdev, "pri-lost-after-sb");
2670                         } else {
2671                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2672                                 rv = hg;
2673                         }
2674                 } else
2675                         rv = hg;
2676         }
2677
2678         return rv;
2679 }
2680
2681 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2682 {
2683         int hg, rv = -100;
2684         enum drbd_after_sb_p after_sb_2p;
2685
2686         rcu_read_lock();
2687         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2688         rcu_read_unlock();
2689         switch (after_sb_2p) {
2690         case ASB_DISCARD_YOUNGER_PRI:
2691         case ASB_DISCARD_OLDER_PRI:
2692         case ASB_DISCARD_LEAST_CHG:
2693         case ASB_DISCARD_LOCAL:
2694         case ASB_DISCARD_REMOTE:
2695         case ASB_CONSENSUS:
2696         case ASB_DISCARD_SECONDARY:
2697         case ASB_DISCARD_ZERO_CHG:
2698                 dev_err(DEV, "Configuration error.\n");
2699                 break;
2700         case ASB_VIOLENTLY:
2701                 rv = drbd_asb_recover_0p(mdev);
2702                 break;
2703         case ASB_DISCONNECT:
2704                 break;
2705         case ASB_CALL_HELPER:
2706                 hg = drbd_asb_recover_0p(mdev);
2707                 if (hg == -1) {
2708                         enum drbd_state_rv rv2;
2709
2710                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2711                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2712                           * we do not need to wait for the after state change work either. */
2713                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2714                         if (rv2 != SS_SUCCESS) {
2715                                 drbd_khelper(mdev, "pri-lost-after-sb");
2716                         } else {
2717                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2718                                 rv = hg;
2719                         }
2720                 } else
2721                         rv = hg;
2722         }
2723
2724         return rv;
2725 }
2726
2727 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2728                            u64 bits, u64 flags)
2729 {
2730         if (!uuid) {
2731                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2732                 return;
2733         }
2734         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2735              text,
2736              (unsigned long long)uuid[UI_CURRENT],
2737              (unsigned long long)uuid[UI_BITMAP],
2738              (unsigned long long)uuid[UI_HISTORY_START],
2739              (unsigned long long)uuid[UI_HISTORY_END],
2740              (unsigned long long)bits,
2741              (unsigned long long)flags);
2742 }
2743
2744 /*
2745   100   after split brain try auto recover
2746     2   C_SYNC_SOURCE set BitMap
2747     1   C_SYNC_SOURCE use BitMap
2748     0   no Sync
2749    -1   C_SYNC_TARGET use BitMap
2750    -2   C_SYNC_TARGET set BitMap
2751  -100   after split brain, disconnect
2752 -1000   unrelated data
2753 -1091   requires proto 91
2754 -1096   requires proto 96
2755  */
2756 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2757 {
2758         u64 self, peer;
2759         int i, j;
2760
2761         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2762         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2763
2764         *rule_nr = 10;
2765         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2766                 return 0;
2767
2768         *rule_nr = 20;
2769         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2770              peer != UUID_JUST_CREATED)
2771                 return -2;
2772
2773         *rule_nr = 30;
2774         if (self != UUID_JUST_CREATED &&
2775             (peer == UUID_JUST_CREATED || peer == (u64)0))
2776                 return 2;
2777
2778         if (self == peer) {
2779                 int rct, dc; /* roles at crash time */
2780
2781                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2782
2783                         if (mdev->tconn->agreed_pro_version < 91)
2784                                 return -1091;
2785
2786                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2787                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2788                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2789                                 drbd_uuid_set_bm(mdev, 0UL);
2790
2791                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2792                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2793                                 *rule_nr = 34;
2794                         } else {
2795                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2796                                 *rule_nr = 36;
2797                         }
2798
2799                         return 1;
2800                 }
2801
2802                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2803
2804                         if (mdev->tconn->agreed_pro_version < 91)
2805                                 return -1091;
2806
2807                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2808                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2809                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2810
2811                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2812                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2813                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2814
2815                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2816                                 *rule_nr = 35;
2817                         } else {
2818                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2819                                 *rule_nr = 37;
2820                         }
2821
2822                         return -1;
2823                 }
2824
2825                 /* Common power [off|failure] */
2826                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2827                         (mdev->p_uuid[UI_FLAGS] & 2);
2828                 /* lowest bit is set when we were primary,
2829                  * next bit (weight 2) is set when peer was primary */
2830                 *rule_nr = 40;
2831
2832                 switch (rct) {
2833                 case 0: /* !self_pri && !peer_pri */ return 0;
2834                 case 1: /*  self_pri && !peer_pri */ return 1;
2835                 case 2: /* !self_pri &&  peer_pri */ return -1;
2836                 case 3: /*  self_pri &&  peer_pri */
2837                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2838                         return dc ? -1 : 1;
2839                 }
2840         }
2841
2842         *rule_nr = 50;
2843         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2844         if (self == peer)
2845                 return -1;
2846
2847         *rule_nr = 51;
2848         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2849         if (self == peer) {
2850                 if (mdev->tconn->agreed_pro_version < 96 ?
2851                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2852                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2853                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2854                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2855                            resync as sync source modifications of the peer's UUIDs. */
2856
2857                         if (mdev->tconn->agreed_pro_version < 91)
2858                                 return -1091;
2859
2860                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2861                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2862
2863                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2864                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2865
2866                         return -1;
2867                 }
2868         }
2869
2870         *rule_nr = 60;
2871         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2872         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2873                 peer = mdev->p_uuid[i] & ~((u64)1);
2874                 if (self == peer)
2875                         return -2;
2876         }
2877
2878         *rule_nr = 70;
2879         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2880         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2881         if (self == peer)
2882                 return 1;
2883
2884         *rule_nr = 71;
2885         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2886         if (self == peer) {
2887                 if (mdev->tconn->agreed_pro_version < 96 ?
2888                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2889                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2890                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2891                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2892                            resync as sync source modifications of our UUIDs. */
2893
2894                         if (mdev->tconn->agreed_pro_version < 91)
2895                                 return -1091;
2896
2897                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2898                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2899
2900                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2901                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2902                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2903
2904                         return 1;
2905                 }
2906         }
2907
2908
2909         *rule_nr = 80;
2910         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2911         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2912                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2913                 if (self == peer)
2914                         return 2;
2915         }
2916
2917         *rule_nr = 90;
2918         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2919         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2920         if (self == peer && self != ((u64)0))
2921                 return 100;
2922
2923         *rule_nr = 100;
2924         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2925                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2926                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2927                         peer = mdev->p_uuid[j] & ~((u64)1);
2928                         if (self == peer)
2929                                 return -100;
2930                 }
2931         }
2932
2933         return -1000;
2934 }
2935
2936 /* drbd_sync_handshake() returns the new conn state on success, or
2937    CONN_MASK (-1) on failure.
2938  */
2939 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2940                                            enum drbd_disk_state peer_disk) __must_hold(local)
2941 {
2942         enum drbd_conns rv = C_MASK;
2943         enum drbd_disk_state mydisk;
2944         struct net_conf *nc;
2945         int hg, rule_nr, rr_conflict, tentative;
2946
2947         mydisk = mdev->state.disk;
2948         if (mydisk == D_NEGOTIATING)
2949                 mydisk = mdev->new_state_tmp.disk;
2950
2951         dev_info(DEV, "drbd_sync_handshake:\n");
2952         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2953         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2954                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2955
2956         hg = drbd_uuid_compare(mdev, &rule_nr);
2957
2958         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2959
2960         if (hg == -1000) {
2961                 dev_alert(DEV, "Unrelated data, aborting!\n");
2962                 return C_MASK;
2963         }
2964         if (hg < -1000) {
2965                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2966                 return C_MASK;
2967         }
2968
2969         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2970             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2971                 int f = (hg == -100) || abs(hg) == 2;
2972                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2973                 if (f)
2974                         hg = hg*2;
2975                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2976                      hg > 0 ? "source" : "target");
2977         }
2978
2979         if (abs(hg) == 100)
2980                 drbd_khelper(mdev, "initial-split-brain");
2981
2982         rcu_read_lock();
2983         nc = rcu_dereference(mdev->tconn->net_conf);
2984
2985         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2986                 int pcount = (mdev->state.role == R_PRIMARY)
2987                            + (peer_role == R_PRIMARY);
2988                 int forced = (hg == -100);
2989
2990                 switch (pcount) {
2991                 case 0:
2992                         hg = drbd_asb_recover_0p(mdev);
2993                         break;
2994                 case 1:
2995                         hg = drbd_asb_recover_1p(mdev);
2996                         break;
2997                 case 2:
2998                         hg = drbd_asb_recover_2p(mdev);
2999                         break;
3000                 }
3001                 if (abs(hg) < 100) {
3002                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
3003                              "automatically solved. Sync from %s node\n",
3004                              pcount, (hg < 0) ? "peer" : "this");
3005                         if (forced) {
3006                                 dev_warn(DEV, "Doing a full sync, since"
3007                                      " UUIDs where ambiguous.\n");
3008                                 hg = hg*2;
3009                         }
3010                 }
3011         }
3012
3013         if (hg == -100) {
3014                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3015                         hg = -1;
3016                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3017                         hg = 1;
3018
3019                 if (abs(hg) < 100)
3020                         dev_warn(DEV, "Split-Brain detected, manually solved. "
3021                              "Sync from %s node\n",
3022                              (hg < 0) ? "peer" : "this");
3023         }
3024         rr_conflict = nc->rr_conflict;
3025         tentative = nc->tentative;
3026         rcu_read_unlock();
3027
3028         if (hg == -100) {
3029                 /* FIXME this log message is not correct if we end up here
3030                  * after an attempted attach on a diskless node.
3031                  * We just refuse to attach -- well, we drop the "connection"
3032                  * to that disk, in a way... */
3033                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3034                 drbd_khelper(mdev, "split-brain");
3035                 return C_MASK;
3036         }
3037
3038         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3039                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3040                 return C_MASK;
3041         }
3042
3043         if (hg < 0 && /* by intention we do not use mydisk here. */
3044             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3045                 switch (rr_conflict) {
3046                 case ASB_CALL_HELPER:
3047                         drbd_khelper(mdev, "pri-lost");
3048                         /* fall through */
3049                 case ASB_DISCONNECT:
3050                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3051                         return C_MASK;
3052                 case ASB_VIOLENTLY:
3053                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3054                              "assumption\n");
3055                 }
3056         }
3057
3058         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3059                 if (hg == 0)
3060                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3061                 else
3062                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3063                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3064                                  abs(hg) >= 2 ? "full" : "bit-map based");
3065                 return C_MASK;
3066         }
3067
3068         if (abs(hg) >= 2) {
3069                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3070                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3071                                         BM_LOCKED_SET_ALLOWED))
3072                         return C_MASK;
3073         }
3074
3075         if (hg > 0) { /* become sync source. */
3076                 rv = C_WF_BITMAP_S;
3077         } else if (hg < 0) { /* become sync target */
3078                 rv = C_WF_BITMAP_T;
3079         } else {
3080                 rv = C_CONNECTED;
3081                 if (drbd_bm_total_weight(mdev)) {
3082                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3083                              drbd_bm_total_weight(mdev));
3084                 }
3085         }
3086
3087         return rv;
3088 }
3089
3090 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3091 {
3092         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3093         if (peer == ASB_DISCARD_REMOTE)
3094                 return ASB_DISCARD_LOCAL;
3095
3096         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3097         if (peer == ASB_DISCARD_LOCAL)
3098                 return ASB_DISCARD_REMOTE;
3099
3100         /* everything else is valid if they are equal on both sides. */
3101         return peer;
3102 }
3103
3104 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3105 {
3106         struct p_protocol *p = pi->data;
3107         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3108         int p_proto, p_discard_my_data, p_two_primaries, cf;
3109         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3110         char integrity_alg[SHARED_SECRET_MAX] = "";
3111         struct crypto_hash *peer_integrity_tfm = NULL;
3112         void *int_dig_in = NULL, *int_dig_vv = NULL;
3113
3114         p_proto         = be32_to_cpu(p->protocol);
3115         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3116         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3117         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3118         p_two_primaries = be32_to_cpu(p->two_primaries);
3119         cf              = be32_to_cpu(p->conn_flags);
3120         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3121
3122         if (tconn->agreed_pro_version >= 87) {
3123                 int err;
3124
3125                 if (pi->size > sizeof(integrity_alg))
3126                         return -EIO;
3127                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3128                 if (err)
3129                         return err;
3130                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3131         }
3132
3133         if (pi->cmd != P_PROTOCOL_UPDATE) {
3134                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3135
3136                 if (cf & CF_DRY_RUN)
3137                         set_bit(CONN_DRY_RUN, &tconn->flags);
3138
3139                 rcu_read_lock();
3140                 nc = rcu_dereference(tconn->net_conf);
3141
3142                 if (p_proto != nc->wire_protocol) {
3143                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3144                         goto disconnect_rcu_unlock;
3145                 }
3146
3147                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3148                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3149                         goto disconnect_rcu_unlock;
3150                 }
3151
3152                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3153                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3154                         goto disconnect_rcu_unlock;
3155                 }
3156
3157                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3158                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3159                         goto disconnect_rcu_unlock;
3160                 }
3161
3162                 if (p_discard_my_data && nc->discard_my_data) {
3163                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3164                         goto disconnect_rcu_unlock;
3165                 }
3166
3167                 if (p_two_primaries != nc->two_primaries) {
3168                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3169                         goto disconnect_rcu_unlock;
3170                 }
3171
3172                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3173                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3174                         goto disconnect_rcu_unlock;
3175                 }
3176
3177                 rcu_read_unlock();
3178         }
3179
3180         if (integrity_alg[0]) {
3181                 int hash_size;
3182
3183                 /*
3184                  * We can only change the peer data integrity algorithm
3185                  * here.  Changing our own data integrity algorithm
3186                  * requires that we send a P_PROTOCOL_UPDATE packet at
3187                  * the same time; otherwise, the peer has no way to
3188                  * tell between which packets the algorithm should
3189                  * change.
3190                  */
3191
3192                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3193                 if (!peer_integrity_tfm) {
3194                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3195                                  integrity_alg);
3196                         goto disconnect;
3197                 }
3198
3199                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3200                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3201                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3202                 if (!(int_dig_in && int_dig_vv)) {
3203                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3204                         goto disconnect;
3205                 }
3206         }
3207
3208         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3209         if (!new_net_conf) {
3210                 conn_err(tconn, "Allocation of new net_conf failed\n");
3211                 goto disconnect;
3212         }
3213
3214         mutex_lock(&tconn->data.mutex);
3215         mutex_lock(&tconn->conf_update);
3216         old_net_conf = tconn->net_conf;
3217         *new_net_conf = *old_net_conf;
3218
3219         new_net_conf->wire_protocol = p_proto;
3220         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3221         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3222         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3223         new_net_conf->two_primaries = p_two_primaries;
3224
3225         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3226         mutex_unlock(&tconn->conf_update);
3227         mutex_unlock(&tconn->data.mutex);
3228
3229         crypto_free_hash(tconn->peer_integrity_tfm);
3230         kfree(tconn->int_dig_in);
3231         kfree(tconn->int_dig_vv);
3232         tconn->peer_integrity_tfm = peer_integrity_tfm;
3233         tconn->int_dig_in = int_dig_in;
3234         tconn->int_dig_vv = int_dig_vv;
3235
3236         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3237                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3238                           integrity_alg[0] ? integrity_alg : "(none)");
3239
3240         synchronize_rcu();
3241         kfree(old_net_conf);
3242         return 0;
3243
3244 disconnect_rcu_unlock:
3245         rcu_read_unlock();
3246 disconnect:
3247         crypto_free_hash(peer_integrity_tfm);
3248         kfree(int_dig_in);
3249         kfree(int_dig_vv);
3250         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3251         return -EIO;
3252 }
3253
3254 /* helper function
3255  * input: alg name, feature name
3256  * return: NULL (alg name was "")
3257  *         ERR_PTR(error) if something goes wrong
3258  *         or the crypto hash ptr, if it worked out ok. */
3259 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3260                 const char *alg, const char *name)
3261 {
3262         struct crypto_hash *tfm;
3263
3264         if (!alg[0])
3265                 return NULL;
3266
3267         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3268         if (IS_ERR(tfm)) {
3269                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3270                         alg, name, PTR_ERR(tfm));
3271                 return tfm;
3272         }
3273         return tfm;
3274 }
3275
3276 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3277 {
3278         void *buffer = tconn->data.rbuf;
3279         int size = pi->size;
3280
3281         while (size) {
3282                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3283                 s = drbd_recv(tconn, buffer, s);
3284                 if (s <= 0) {
3285                         if (s < 0)
3286                                 return s;
3287                         break;
3288                 }
3289                 size -= s;
3290         }
3291         if (size)
3292                 return -EIO;
3293         return 0;
3294 }
3295
3296 /*
3297  * config_unknown_volume  -  device configuration command for unknown volume
3298  *
3299  * When a device is added to an existing connection, the node on which the
3300  * device is added first will send configuration commands to its peer but the
3301  * peer will not know about the device yet.  It will warn and ignore these
3302  * commands.  Once the device is added on the second node, the second node will
3303  * send the same device configuration commands, but in the other direction.
3304  *
3305  * (We can also end up here if drbd is misconfigured.)
3306  */
3307 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3308 {
3309         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3310                   cmdname(pi->cmd), pi->vnr);
3311         return ignore_remaining_packet(tconn, pi);
3312 }
3313
3314 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3315 {
3316         struct drbd_conf *mdev;
3317         struct p_rs_param_95 *p;
3318         unsigned int header_size, data_size, exp_max_sz;
3319         struct crypto_hash *verify_tfm = NULL;
3320         struct crypto_hash *csums_tfm = NULL;
3321         struct net_conf *old_net_conf, *new_net_conf = NULL;
3322         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3323         const int apv = tconn->agreed_pro_version;
3324         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3325         int fifo_size = 0;
3326         int err;
3327
3328         mdev = vnr_to_mdev(tconn, pi->vnr);
3329         if (!mdev)
3330                 return config_unknown_volume(tconn, pi);
3331
3332         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3333                     : apv == 88 ? sizeof(struct p_rs_param)
3334                                         + SHARED_SECRET_MAX
3335                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3336                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3337
3338         if (pi->size > exp_max_sz) {
3339                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3340                     pi->size, exp_max_sz);
3341                 return -EIO;
3342         }
3343
3344         if (apv <= 88) {
3345                 header_size = sizeof(struct p_rs_param);
3346                 data_size = pi->size - header_size;
3347         } else if (apv <= 94) {
3348                 header_size = sizeof(struct p_rs_param_89);
3349                 data_size = pi->size - header_size;
3350                 D_ASSERT(data_size == 0);
3351         } else {
3352                 header_size = sizeof(struct p_rs_param_95);
3353                 data_size = pi->size - header_size;
3354                 D_ASSERT(data_size == 0);
3355         }
3356
3357         /* initialize verify_alg and csums_alg */
3358         p = pi->data;
3359         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3360
3361         err = drbd_recv_all(mdev->tconn, p, header_size);
3362         if (err)
3363                 return err;
3364
3365         mutex_lock(&mdev->tconn->conf_update);
3366         old_net_conf = mdev->tconn->net_conf;
3367         if (get_ldev(mdev)) {
3368                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3369                 if (!new_disk_conf) {
3370                         put_ldev(mdev);
3371                         mutex_unlock(&mdev->tconn->conf_update);
3372                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3373                         return -ENOMEM;
3374                 }
3375
3376                 old_disk_conf = mdev->ldev->disk_conf;
3377                 *new_disk_conf = *old_disk_conf;
3378
3379                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3380         }
3381
3382         if (apv >= 88) {
3383                 if (apv == 88) {
3384                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3385                                 dev_err(DEV, "verify-alg of wrong size, "
3386                                         "peer wants %u, accepting only up to %u byte\n",
3387                                         data_size, SHARED_SECRET_MAX);
3388                                 err = -EIO;
3389                                 goto reconnect;
3390                         }
3391
3392                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3393                         if (err)
3394                                 goto reconnect;
3395                         /* we expect NUL terminated string */
3396                         /* but just in case someone tries to be evil */
3397                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3398                         p->verify_alg[data_size-1] = 0;
3399
3400                 } else /* apv >= 89 */ {
3401                         /* we still expect NUL terminated strings */
3402                         /* but just in case someone tries to be evil */
3403                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3404                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3405                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3406                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3407                 }
3408
3409                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3410                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3411                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3412                                     old_net_conf->verify_alg, p->verify_alg);
3413                                 goto disconnect;
3414                         }
3415                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3416                                         p->verify_alg, "verify-alg");
3417                         if (IS_ERR(verify_tfm)) {
3418                                 verify_tfm = NULL;
3419                                 goto disconnect;
3420                         }
3421                 }
3422
3423                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3424                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3425                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3426                                     old_net_conf->csums_alg, p->csums_alg);
3427                                 goto disconnect;
3428                         }
3429                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3430                                         p->csums_alg, "csums-alg");
3431                         if (IS_ERR(csums_tfm)) {
3432                                 csums_tfm = NULL;
3433                                 goto disconnect;
3434                         }
3435                 }
3436
3437                 if (apv > 94 && new_disk_conf) {
3438                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3439                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3440                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3441                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3442
3443                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3444                         if (fifo_size != mdev->rs_plan_s->size) {
3445                                 new_plan = fifo_alloc(fifo_size);
3446                                 if (!new_plan) {
3447                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3448                                         put_ldev(mdev);
3449                                         goto disconnect;
3450                                 }
3451                         }
3452                 }
3453
3454                 if (verify_tfm || csums_tfm) {
3455                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3456                         if (!new_net_conf) {
3457                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3458                                 goto disconnect;
3459                         }
3460
3461                         *new_net_conf = *old_net_conf;
3462
3463                         if (verify_tfm) {
3464                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3465                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3466                                 crypto_free_hash(mdev->tconn->verify_tfm);
3467                                 mdev->tconn->verify_tfm = verify_tfm;
3468                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3469                         }
3470                         if (csums_tfm) {
3471                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3472                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3473                                 crypto_free_hash(mdev->tconn->csums_tfm);
3474                                 mdev->tconn->csums_tfm = csums_tfm;
3475                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3476                         }
3477                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3478                 }
3479         }
3480
3481         if (new_disk_conf) {
3482                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3483                 put_ldev(mdev);
3484         }
3485
3486         if (new_plan) {
3487                 old_plan = mdev->rs_plan_s;
3488                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3489         }
3490
3491         mutex_unlock(&mdev->tconn->conf_update);
3492         synchronize_rcu();
3493         if (new_net_conf)
3494                 kfree(old_net_conf);
3495         kfree(old_disk_conf);
3496         kfree(old_plan);
3497
3498         return 0;
3499
3500 reconnect:
3501         if (new_disk_conf) {
3502                 put_ldev(mdev);
3503                 kfree(new_disk_conf);
3504         }
3505         mutex_unlock(&mdev->tconn->conf_update);
3506         return -EIO;
3507
3508 disconnect:
3509         kfree(new_plan);
3510         if (new_disk_conf) {
3511                 put_ldev(mdev);
3512                 kfree(new_disk_conf);
3513         }
3514         mutex_unlock(&mdev->tconn->conf_update);
3515         /* just for completeness: actually not needed,
3516          * as this is not reached if csums_tfm was ok. */
3517         crypto_free_hash(csums_tfm);
3518         /* but free the verify_tfm again, if csums_tfm did not work out */
3519         crypto_free_hash(verify_tfm);
3520         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3521         return -EIO;
3522 }
3523
3524 /* warn if the arguments differ by more than 12.5% */
3525 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3526         const char *s, sector_t a, sector_t b)
3527 {
3528         sector_t d;
3529         if (a == 0 || b == 0)
3530                 return;
3531         d = (a > b) ? (a - b) : (b - a);
3532         if (d > (a>>3) || d > (b>>3))
3533                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3534                      (unsigned long long)a, (unsigned long long)b);
3535 }
3536
3537 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3538 {
3539         struct drbd_conf *mdev;
3540         struct p_sizes *p = pi->data;
3541         enum determine_dev_size dd = unchanged;
3542         sector_t p_size, p_usize, my_usize;
3543         int ldsc = 0; /* local disk size changed */
3544         enum dds_flags ddsf;
3545
3546         mdev = vnr_to_mdev(tconn, pi->vnr);
3547         if (!mdev)
3548                 return config_unknown_volume(tconn, pi);
3549
3550         p_size = be64_to_cpu(p->d_size);
3551         p_usize = be64_to_cpu(p->u_size);
3552
3553         /* just store the peer's disk size for now.
3554          * we still need to figure out whether we accept that. */
3555         mdev->p_size = p_size;
3556
3557         if (get_ldev(mdev)) {
3558                 rcu_read_lock();
3559                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3560                 rcu_read_unlock();
3561
3562                 warn_if_differ_considerably(mdev, "lower level device sizes",
3563                            p_size, drbd_get_max_capacity(mdev->ldev));
3564                 warn_if_differ_considerably(mdev, "user requested size",
3565                                             p_usize, my_usize);
3566
3567                 /* if this is the first connect, or an otherwise expected
3568                  * param exchange, choose the minimum */
3569                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3570                         p_usize = min_not_zero(my_usize, p_usize);
3571
3572                 /* Never shrink a device with usable data during connect.
3573                    But allow online shrinking if we are connected. */
3574                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3575                     drbd_get_capacity(mdev->this_bdev) &&
3576                     mdev->state.disk >= D_OUTDATED &&
3577                     mdev->state.conn < C_CONNECTED) {
3578                         dev_err(DEV, "The peer's disk size is too small!\n");
3579                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3580                         put_ldev(mdev);
3581                         return -EIO;
3582                 }
3583
3584                 if (my_usize != p_usize) {
3585                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3586
3587                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3588                         if (!new_disk_conf) {
3589                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3590                                 put_ldev(mdev);
3591                                 return -ENOMEM;
3592                         }
3593
3594                         mutex_lock(&mdev->tconn->conf_update);
3595                         old_disk_conf = mdev->ldev->disk_conf;
3596                         *new_disk_conf = *old_disk_conf;
3597                         new_disk_conf->disk_size = p_usize;
3598
3599                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3600                         mutex_unlock(&mdev->tconn->conf_update);
3601                         synchronize_rcu();
3602                         kfree(old_disk_conf);
3603
3604                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3605                                  (unsigned long)my_usize);
3606                 }
3607
3608                 put_ldev(mdev);
3609         }
3610
3611         ddsf = be16_to_cpu(p->dds_flags);
3612         if (get_ldev(mdev)) {
3613                 dd = drbd_determine_dev_size(mdev, ddsf);
3614                 put_ldev(mdev);
3615                 if (dd == dev_size_error)
3616                         return -EIO;
3617                 drbd_md_sync(mdev);
3618         } else {
3619                 /* I am diskless, need to accept the peer's size. */
3620                 drbd_set_my_capacity(mdev, p_size);
3621         }
3622
3623         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3624         drbd_reconsider_max_bio_size(mdev);
3625
3626         if (get_ldev(mdev)) {
3627                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3628                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3629                         ldsc = 1;
3630                 }
3631
3632                 put_ldev(mdev);
3633         }
3634
3635         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3636                 if (be64_to_cpu(p->c_size) !=
3637                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3638                         /* we have different sizes, probably peer
3639                          * needs to know my new size... */
3640                         drbd_send_sizes(mdev, 0, ddsf);
3641                 }
3642                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3643                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3644                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3645                             mdev->state.disk >= D_INCONSISTENT) {
3646                                 if (ddsf & DDSF_NO_RESYNC)
3647                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3648                                 else
3649                                         resync_after_online_grow(mdev);
3650                         } else
3651                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3652                 }
3653         }
3654
3655         return 0;
3656 }
3657
3658 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3659 {
3660         struct drbd_conf *mdev;
3661         struct p_uuids *p = pi->data;
3662         u64 *p_uuid;
3663         int i, updated_uuids = 0;
3664
3665         mdev = vnr_to_mdev(tconn, pi->vnr);
3666         if (!mdev)
3667                 return config_unknown_volume(tconn, pi);
3668
3669         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3670
3671         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3672                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3673
3674         kfree(mdev->p_uuid);
3675         mdev->p_uuid = p_uuid;
3676
3677         if (mdev->state.conn < C_CONNECTED &&
3678             mdev->state.disk < D_INCONSISTENT &&
3679             mdev->state.role == R_PRIMARY &&
3680             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3681                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3682                     (unsigned long long)mdev->ed_uuid);
3683                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3684                 return -EIO;
3685         }
3686
3687         if (get_ldev(mdev)) {
3688                 int skip_initial_sync =
3689                         mdev->state.conn == C_CONNECTED &&
3690                         mdev->tconn->agreed_pro_version >= 90 &&
3691                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3692                         (p_uuid[UI_FLAGS] & 8);
3693                 if (skip_initial_sync) {
3694                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3695                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3696                                         "clear_n_write from receive_uuids",
3697                                         BM_LOCKED_TEST_ALLOWED);
3698                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3699                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3700                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3701                                         CS_VERBOSE, NULL);
3702                         drbd_md_sync(mdev);
3703                         updated_uuids = 1;
3704                 }
3705                 put_ldev(mdev);
3706         } else if (mdev->state.disk < D_INCONSISTENT &&
3707                    mdev->state.role == R_PRIMARY) {
3708                 /* I am a diskless primary, the peer just created a new current UUID
3709                    for me. */
3710                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3711         }
3712
3713         /* Before we test for the disk state, we should wait until an eventually
3714            ongoing cluster wide state change is finished. That is important if
3715            we are primary and are detaching from our disk. We need to see the
3716            new disk state... */
3717         mutex_lock(mdev->state_mutex);
3718         mutex_unlock(mdev->state_mutex);
3719         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3720                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3721
3722         if (updated_uuids)
3723                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3724
3725         return 0;
3726 }
3727
3728 /**
3729  * convert_state() - Converts the peer's view of the cluster state to our point of view
3730  * @ps:         The state as seen by the peer.
3731  */
3732 static union drbd_state convert_state(union drbd_state ps)
3733 {
3734         union drbd_state ms;
3735
3736         static enum drbd_conns c_tab[] = {
3737                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3738                 [C_CONNECTED] = C_CONNECTED,
3739
3740                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3741                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3742                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3743                 [C_VERIFY_S]       = C_VERIFY_T,
3744                 [C_MASK]   = C_MASK,
3745         };
3746
3747         ms.i = ps.i;
3748
3749         ms.conn = c_tab[ps.conn];
3750         ms.peer = ps.role;
3751         ms.role = ps.peer;
3752         ms.pdsk = ps.disk;
3753         ms.disk = ps.pdsk;
3754         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3755
3756         return ms;
3757 }
3758
3759 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3760 {
3761         struct drbd_conf *mdev;
3762         struct p_req_state *p = pi->data;
3763         union drbd_state mask, val;
3764         enum drbd_state_rv rv;
3765
3766         mdev = vnr_to_mdev(tconn, pi->vnr);
3767         if (!mdev)
3768                 return -EIO;
3769
3770         mask.i = be32_to_cpu(p->mask);
3771         val.i = be32_to_cpu(p->val);
3772
3773         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3774             mutex_is_locked(mdev->state_mutex)) {
3775                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3776                 return 0;
3777         }
3778
3779         mask = convert_state(mask);
3780         val = convert_state(val);
3781
3782         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3783         drbd_send_sr_reply(mdev, rv);
3784
3785         drbd_md_sync(mdev);
3786
3787         return 0;
3788 }
3789
3790 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3791 {
3792         struct p_req_state *p = pi->data;
3793         union drbd_state mask, val;
3794         enum drbd_state_rv rv;
3795
3796         mask.i = be32_to_cpu(p->mask);
3797         val.i = be32_to_cpu(p->val);
3798
3799         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3800             mutex_is_locked(&tconn->cstate_mutex)) {
3801                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3802                 return 0;
3803         }
3804
3805         mask = convert_state(mask);
3806         val = convert_state(val);
3807
3808         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3809         conn_send_sr_reply(tconn, rv);
3810
3811         return 0;
3812 }
3813
3814 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3815 {
3816         struct drbd_conf *mdev;
3817         struct p_state *p = pi->data;
3818         union drbd_state os, ns, peer_state;
3819         enum drbd_disk_state real_peer_disk;
3820         enum chg_state_flags cs_flags;
3821         int rv;
3822
3823         mdev = vnr_to_mdev(tconn, pi->vnr);
3824         if (!mdev)
3825                 return config_unknown_volume(tconn, pi);
3826
3827         peer_state.i = be32_to_cpu(p->state);
3828
3829         real_peer_disk = peer_state.disk;
3830         if (peer_state.disk == D_NEGOTIATING) {
3831                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3832                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3833         }
3834
3835         spin_lock_irq(&mdev->tconn->req_lock);
3836  retry:
3837         os = ns = drbd_read_state(mdev);
3838         spin_unlock_irq(&mdev->tconn->req_lock);
3839
3840         /* If some other part of the code (asender thread, timeout)
3841          * already decided to close the connection again,
3842          * we must not "re-establish" it here. */
3843         if (os.conn <= C_TEAR_DOWN)
3844                 return -ECONNRESET;
3845
3846         /* If this is the "end of sync" confirmation, usually the peer disk
3847          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3848          * set) resync started in PausedSyncT, or if the timing of pause-/
3849          * unpause-sync events has been "just right", the peer disk may
3850          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3851          */
3852         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3853             real_peer_disk == D_UP_TO_DATE &&
3854             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3855                 /* If we are (becoming) SyncSource, but peer is still in sync
3856                  * preparation, ignore its uptodate-ness to avoid flapping, it
3857                  * will change to inconsistent once the peer reaches active
3858                  * syncing states.
3859                  * It may have changed syncer-paused flags, however, so we
3860                  * cannot ignore this completely. */
3861                 if (peer_state.conn > C_CONNECTED &&
3862                     peer_state.conn < C_SYNC_SOURCE)
3863                         real_peer_disk = D_INCONSISTENT;
3864
3865                 /* if peer_state changes to connected at the same time,
3866                  * it explicitly notifies us that it finished resync.
3867                  * Maybe we should finish it up, too? */
3868                 else if (os.conn >= C_SYNC_SOURCE &&
3869                          peer_state.conn == C_CONNECTED) {
3870                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3871                                 drbd_resync_finished(mdev);
3872                         return 0;
3873                 }
3874         }
3875
3876         /* explicit verify finished notification, stop sector reached. */
3877         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3878             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3879                 ov_out_of_sync_print(mdev);
3880                 drbd_resync_finished(mdev);
3881                 return 0;
3882         }
3883
3884         /* peer says his disk is inconsistent, while we think it is uptodate,
3885          * and this happens while the peer still thinks we have a sync going on,
3886          * but we think we are already done with the sync.
3887          * We ignore this to avoid flapping pdsk.
3888          * This should not happen, if the peer is a recent version of drbd. */
3889         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3890             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3891                 real_peer_disk = D_UP_TO_DATE;
3892
3893         if (ns.conn == C_WF_REPORT_PARAMS)
3894                 ns.conn = C_CONNECTED;
3895
3896         if (peer_state.conn == C_AHEAD)
3897                 ns.conn = C_BEHIND;
3898
3899         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3900             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3901                 int cr; /* consider resync */
3902
3903                 /* if we established a new connection */
3904                 cr  = (os.conn < C_CONNECTED);
3905                 /* if we had an established connection
3906                  * and one of the nodes newly attaches a disk */
3907                 cr |= (os.conn == C_CONNECTED &&
3908                        (peer_state.disk == D_NEGOTIATING ||
3909                         os.disk == D_NEGOTIATING));
3910                 /* if we have both been inconsistent, and the peer has been
3911                  * forced to be UpToDate with --overwrite-data */
3912                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3913                 /* if we had been plain connected, and the admin requested to
3914                  * start a sync by "invalidate" or "invalidate-remote" */
3915                 cr |= (os.conn == C_CONNECTED &&
3916                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3917                                  peer_state.conn <= C_WF_BITMAP_T));
3918
3919                 if (cr)
3920                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3921
3922                 put_ldev(mdev);
3923                 if (ns.conn == C_MASK) {
3924                         ns.conn = C_CONNECTED;
3925                         if (mdev->state.disk == D_NEGOTIATING) {
3926                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3927                         } else if (peer_state.disk == D_NEGOTIATING) {
3928                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3929                                 peer_state.disk = D_DISKLESS;
3930                                 real_peer_disk = D_DISKLESS;
3931                         } else {
3932                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3933                                         return -EIO;
3934                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3935                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3936                                 return -EIO;
3937                         }
3938                 }
3939         }
3940
3941         spin_lock_irq(&mdev->tconn->req_lock);
3942         if (os.i != drbd_read_state(mdev).i)
3943                 goto retry;
3944         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3945         ns.peer = peer_state.role;
3946         ns.pdsk = real_peer_disk;
3947         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3948         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3949                 ns.disk = mdev->new_state_tmp.disk;
3950         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3951         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3952             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3953                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3954                    for temporal network outages! */
3955                 spin_unlock_irq(&mdev->tconn->req_lock);
3956                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3957                 tl_clear(mdev->tconn);
3958                 drbd_uuid_new_current(mdev);
3959                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3960                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3961                 return -EIO;
3962         }
3963         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3964         ns = drbd_read_state(mdev);
3965         spin_unlock_irq(&mdev->tconn->req_lock);
3966
3967         if (rv < SS_SUCCESS) {
3968                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3969                 return -EIO;
3970         }
3971
3972         if (os.conn > C_WF_REPORT_PARAMS) {
3973                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3974                     peer_state.disk != D_NEGOTIATING ) {
3975                         /* we want resync, peer has not yet decided to sync... */
3976                         /* Nowadays only used when forcing a node into primary role and
3977                            setting its disk to UpToDate with that */
3978                         drbd_send_uuids(mdev);
3979                         drbd_send_current_state(mdev);
3980                 }
3981         }
3982
3983         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3984
3985         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3986
3987         return 0;
3988 }
3989
3990 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3991 {
3992         struct drbd_conf *mdev;
3993         struct p_rs_uuid *p = pi->data;
3994
3995         mdev = vnr_to_mdev(tconn, pi->vnr);
3996         if (!mdev)
3997                 return -EIO;
3998
3999         wait_event(mdev->misc_wait,
4000                    mdev->state.conn == C_WF_SYNC_UUID ||
4001                    mdev->state.conn == C_BEHIND ||
4002                    mdev->state.conn < C_CONNECTED ||
4003                    mdev->state.disk < D_NEGOTIATING);
4004
4005         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
4006
4007         /* Here the _drbd_uuid_ functions are right, current should
4008            _not_ be rotated into the history */
4009         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
4010                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
4011                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
4012
4013                 drbd_print_uuids(mdev, "updated sync uuid");
4014                 drbd_start_resync(mdev, C_SYNC_TARGET);
4015
4016                 put_ldev(mdev);
4017         } else
4018                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
4019
4020         return 0;
4021 }
4022
4023 /**
4024  * receive_bitmap_plain
4025  *
4026  * Return 0 when done, 1 when another iteration is needed, and a negative error
4027  * code upon failure.
4028  */
4029 static int
4030 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4031                      unsigned long *p, struct bm_xfer_ctx *c)
4032 {
4033         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4034                                  drbd_header_size(mdev->tconn);
4035         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4036                                        c->bm_words - c->word_offset);
4037         unsigned int want = num_words * sizeof(*p);
4038         int err;
4039
4040         if (want != size) {
4041                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4042                 return -EIO;
4043         }
4044         if (want == 0)
4045                 return 0;
4046         err = drbd_recv_all(mdev->tconn, p, want);
4047         if (err)
4048                 return err;
4049
4050         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4051
4052         c->word_offset += num_words;
4053         c->bit_offset = c->word_offset * BITS_PER_LONG;
4054         if (c->bit_offset > c->bm_bits)
4055                 c->bit_offset = c->bm_bits;
4056
4057         return 1;
4058 }
4059
4060 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4061 {
4062         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4063 }
4064
4065 static int dcbp_get_start(struct p_compressed_bm *p)
4066 {
4067         return (p->encoding & 0x80) != 0;
4068 }
4069
4070 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4071 {
4072         return (p->encoding >> 4) & 0x7;
4073 }
4074
4075 /**
4076  * recv_bm_rle_bits
4077  *
4078  * Return 0 when done, 1 when another iteration is needed, and a negative error
4079  * code upon failure.
4080  */
4081 static int
4082 recv_bm_rle_bits(struct drbd_conf *mdev,
4083                 struct p_compressed_bm *p,
4084                  struct bm_xfer_ctx *c,
4085                  unsigned int len)
4086 {
4087         struct bitstream bs;
4088         u64 look_ahead;
4089         u64 rl;
4090         u64 tmp;
4091         unsigned long s = c->bit_offset;
4092         unsigned long e;
4093         int toggle = dcbp_get_start(p);
4094         int have;
4095         int bits;
4096
4097         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4098
4099         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4100         if (bits < 0)
4101                 return -EIO;
4102
4103         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4104                 bits = vli_decode_bits(&rl, look_ahead);
4105                 if (bits <= 0)
4106                         return -EIO;
4107
4108                 if (toggle) {
4109                         e = s + rl -1;
4110                         if (e >= c->bm_bits) {
4111                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4112                                 return -EIO;
4113                         }
4114                         _drbd_bm_set_bits(mdev, s, e);
4115                 }
4116
4117                 if (have < bits) {
4118                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4119                                 have, bits, look_ahead,
4120                                 (unsigned int)(bs.cur.b - p->code),
4121                                 (unsigned int)bs.buf_len);
4122                         return -EIO;
4123                 }
4124                 look_ahead >>= bits;
4125                 have -= bits;
4126
4127                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4128                 if (bits < 0)
4129                         return -EIO;
4130                 look_ahead |= tmp << have;
4131                 have += bits;
4132         }
4133
4134         c->bit_offset = s;
4135         bm_xfer_ctx_bit_to_word_offset(c);
4136
4137         return (s != c->bm_bits);
4138 }
4139
4140 /**
4141  * decode_bitmap_c
4142  *
4143  * Return 0 when done, 1 when another iteration is needed, and a negative error
4144  * code upon failure.
4145  */
4146 static int
4147 decode_bitmap_c(struct drbd_conf *mdev,
4148                 struct p_compressed_bm *p,
4149                 struct bm_xfer_ctx *c,
4150                 unsigned int len)
4151 {
4152         if (dcbp_get_code(p) == RLE_VLI_Bits)
4153                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4154
4155         /* other variants had been implemented for evaluation,
4156          * but have been dropped as this one turned out to be "best"
4157          * during all our tests. */
4158
4159         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4160         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4161         return -EIO;
4162 }
4163
4164 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4165                 const char *direction, struct bm_xfer_ctx *c)
4166 {
4167         /* what would it take to transfer it "plaintext" */
4168         unsigned int header_size = drbd_header_size(mdev->tconn);
4169         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4170         unsigned int plain =
4171                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4172                 c->bm_words * sizeof(unsigned long);
4173         unsigned int total = c->bytes[0] + c->bytes[1];
4174         unsigned int r;
4175
4176         /* total can not be zero. but just in case: */
4177         if (total == 0)
4178                 return;
4179
4180         /* don't report if not compressed */
4181         if (total >= plain)
4182                 return;
4183
4184         /* total < plain. check for overflow, still */
4185         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4186                                     : (1000 * total / plain);
4187
4188         if (r > 1000)
4189                 r = 1000;
4190
4191         r = 1000 - r;
4192         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4193              "total %u; compression: %u.%u%%\n",
4194                         direction,
4195                         c->bytes[1], c->packets[1],
4196                         c->bytes[0], c->packets[0],
4197                         total, r/10, r % 10);
4198 }
4199
4200 /* Since we are processing the bitfield from lower addresses to higher,
4201    it does not matter if the process it in 32 bit chunks or 64 bit
4202    chunks as long as it is little endian. (Understand it as byte stream,
4203    beginning with the lowest byte...) If we would use big endian
4204    we would need to process it from the highest address to the lowest,
4205    in order to be agnostic to the 32 vs 64 bits issue.
4206
4207    returns 0 on failure, 1 if we successfully received it. */
4208 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4209 {
4210         struct drbd_conf *mdev;
4211         struct bm_xfer_ctx c;
4212         int err;
4213
4214         mdev = vnr_to_mdev(tconn, pi->vnr);
4215         if (!mdev)
4216                 return -EIO;
4217
4218         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4219         /* you are supposed to send additional out-of-sync information
4220          * if you actually set bits during this phase */
4221
4222         c = (struct bm_xfer_ctx) {
4223                 .bm_bits = drbd_bm_bits(mdev),
4224                 .bm_words = drbd_bm_words(mdev),
4225         };
4226
4227         for(;;) {
4228                 if (pi->cmd == P_BITMAP)
4229                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4230                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4231                         /* MAYBE: sanity check that we speak proto >= 90,
4232                          * and the feature is enabled! */
4233                         struct p_compressed_bm *p = pi->data;
4234
4235                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4236                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4237                                 err = -EIO;
4238                                 goto out;
4239                         }
4240                         if (pi->size <= sizeof(*p)) {
4241                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4242                                 err = -EIO;
4243                                 goto out;
4244                         }
4245                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4246                         if (err)
4247                                goto out;
4248                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4249                 } else {
4250                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4251                         err = -EIO;
4252                         goto out;
4253                 }
4254
4255                 c.packets[pi->cmd == P_BITMAP]++;
4256                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4257
4258                 if (err <= 0) {
4259                         if (err < 0)
4260                                 goto out;
4261                         break;
4262                 }
4263                 err = drbd_recv_header(mdev->tconn, pi);
4264                 if (err)
4265                         goto out;
4266         }
4267
4268         INFO_bm_xfer_stats(mdev, "receive", &c);
4269
4270         if (mdev->state.conn == C_WF_BITMAP_T) {
4271                 enum drbd_state_rv rv;
4272
4273                 err = drbd_send_bitmap(mdev);
4274                 if (err)
4275                         goto out;
4276                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4277                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4278                 D_ASSERT(rv == SS_SUCCESS);
4279         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4280                 /* admin may have requested C_DISCONNECTING,
4281                  * other threads may have noticed network errors */
4282                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4283                     drbd_conn_str(mdev->state.conn));
4284         }
4285         err = 0;
4286
4287  out:
4288         drbd_bm_unlock(mdev);
4289         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4290                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4291         return err;
4292 }
4293
4294 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4295 {
4296         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4297                  pi->cmd, pi->size);
4298
4299         return ignore_remaining_packet(tconn, pi);
4300 }
4301
4302 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4303 {
4304         /* Make sure we've acked all the TCP data associated
4305          * with the data requests being unplugged */
4306         drbd_tcp_quickack(tconn->data.socket);
4307
4308         return 0;
4309 }
4310
4311 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4312 {
4313         struct drbd_conf *mdev;
4314         struct p_block_desc *p = pi->data;
4315
4316         mdev = vnr_to_mdev(tconn, pi->vnr);
4317         if (!mdev)
4318                 return -EIO;
4319
4320         switch (mdev->state.conn) {
4321         case C_WF_SYNC_UUID:
4322         case C_WF_BITMAP_T:
4323         case C_BEHIND:
4324                         break;
4325         default:
4326                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4327                                 drbd_conn_str(mdev->state.conn));
4328         }
4329
4330         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4331
4332         return 0;
4333 }
4334
4335 struct data_cmd {
4336         int expect_payload;
4337         size_t pkt_size;
4338         int (*fn)(struct drbd_tconn *, struct packet_info *);
4339 };
4340
4341 static struct data_cmd drbd_cmd_handler[] = {
4342         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4343         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4344         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4345         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4346         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4347         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4348         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4349         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4350         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4351         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4352         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4353         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4354         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4355         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4356         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4357         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4358         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4359         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4360         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4361         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4362         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4363         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4364         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4365         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4366 };
4367
4368 static void drbdd(struct drbd_tconn *tconn)
4369 {
4370         struct packet_info pi;
4371         size_t shs; /* sub header size */
4372         int err;
4373
4374         while (get_t_state(&tconn->receiver) == RUNNING) {
4375                 struct data_cmd *cmd;
4376
4377                 drbd_thread_current_set_cpu(&tconn->receiver);
4378                 if (drbd_recv_header(tconn, &pi))
4379                         goto err_out;
4380
4381                 cmd = &drbd_cmd_handler[pi.cmd];
4382                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4383                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4384                                  cmdname(pi.cmd), pi.cmd);
4385                         goto err_out;
4386                 }
4387
4388                 shs = cmd->pkt_size;
4389                 if (pi.size > shs && !cmd->expect_payload) {
4390                         conn_err(tconn, "No payload expected %s l:%d\n",
4391                                  cmdname(pi.cmd), pi.size);
4392                         goto err_out;
4393                 }
4394
4395                 if (shs) {
4396                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4397                         if (err)
4398                                 goto err_out;
4399                         pi.size -= shs;
4400                 }
4401
4402                 err = cmd->fn(tconn, &pi);
4403                 if (err) {
4404                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4405                                  cmdname(pi.cmd), err, pi.size);
4406                         goto err_out;
4407                 }
4408         }
4409         return;
4410
4411     err_out:
4412         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4413 }
4414
4415 void conn_flush_workqueue(struct drbd_tconn *tconn)
4416 {
4417         struct drbd_wq_barrier barr;
4418
4419         barr.w.cb = w_prev_work_done;
4420         barr.w.tconn = tconn;
4421         init_completion(&barr.done);
4422         drbd_queue_work(&tconn->sender_work, &barr.w);
4423         wait_for_completion(&barr.done);
4424 }
4425
4426 static void conn_disconnect(struct drbd_tconn *tconn)
4427 {
4428         struct drbd_conf *mdev;
4429         enum drbd_conns oc;
4430         int vnr;
4431
4432         if (tconn->cstate == C_STANDALONE)
4433                 return;
4434
4435         /* We are about to start the cleanup after connection loss.
4436          * Make sure drbd_make_request knows about that.
4437          * Usually we should be in some network failure state already,
4438          * but just in case we are not, we fix it up here.
4439          */
4440         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4441
4442         /* asender does not clean up anything. it must not interfere, either */
4443         drbd_thread_stop(&tconn->asender);
4444         drbd_free_sock(tconn);
4445
4446         rcu_read_lock();
4447         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4448                 kref_get(&mdev->kref);
4449                 rcu_read_unlock();
4450                 drbd_disconnected(mdev);
4451                 kref_put(&mdev->kref, &drbd_minor_destroy);
4452                 rcu_read_lock();
4453         }
4454         rcu_read_unlock();
4455
4456         if (!list_empty(&tconn->current_epoch->list))
4457                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4458         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4459         atomic_set(&tconn->current_epoch->epoch_size, 0);
4460         tconn->send.seen_any_write_yet = false;
4461
4462         conn_info(tconn, "Connection closed\n");
4463
4464         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4465                 conn_try_outdate_peer_async(tconn);
4466
4467         spin_lock_irq(&tconn->req_lock);
4468         oc = tconn->cstate;
4469         if (oc >= C_UNCONNECTED)
4470                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4471
4472         spin_unlock_irq(&tconn->req_lock);
4473
4474         if (oc == C_DISCONNECTING)
4475                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4476 }
4477
4478 static int drbd_disconnected(struct drbd_conf *mdev)
4479 {
4480         unsigned int i;
4481
4482         /* wait for current activity to cease. */
4483         spin_lock_irq(&mdev->tconn->req_lock);
4484         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4485         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4486         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4487         spin_unlock_irq(&mdev->tconn->req_lock);
4488
4489         /* We do not have data structures that would allow us to
4490          * get the rs_pending_cnt down to 0 again.
4491          *  * On C_SYNC_TARGET we do not have any data structures describing
4492          *    the pending RSDataRequest's we have sent.
4493          *  * On C_SYNC_SOURCE there is no data structure that tracks
4494          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4495          *  And no, it is not the sum of the reference counts in the
4496          *  resync_LRU. The resync_LRU tracks the whole operation including
4497          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4498          *  on the fly. */
4499         drbd_rs_cancel_all(mdev);
4500         mdev->rs_total = 0;
4501         mdev->rs_failed = 0;
4502         atomic_set(&mdev->rs_pending_cnt, 0);
4503         wake_up(&mdev->misc_wait);
4504
4505         del_timer_sync(&mdev->resync_timer);
4506         resync_timer_fn((unsigned long)mdev);
4507
4508         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4509          * w_make_resync_request etc. which may still be on the worker queue
4510          * to be "canceled" */
4511         drbd_flush_workqueue(mdev);
4512
4513         drbd_finish_peer_reqs(mdev);
4514
4515         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4516            might have issued a work again. The one before drbd_finish_peer_reqs() is
4517            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4518         drbd_flush_workqueue(mdev);
4519
4520         kfree(mdev->p_uuid);
4521         mdev->p_uuid = NULL;
4522
4523         if (!drbd_suspended(mdev))
4524                 tl_clear(mdev->tconn);
4525
4526         drbd_md_sync(mdev);
4527
4528         /* serialize with bitmap writeout triggered by the state change,
4529          * if any. */
4530         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4531
4532         /* tcp_close and release of sendpage pages can be deferred.  I don't
4533          * want to use SO_LINGER, because apparently it can be deferred for
4534          * more than 20 seconds (longest time I checked).
4535          *
4536          * Actually we don't care for exactly when the network stack does its
4537          * put_page(), but release our reference on these pages right here.
4538          */
4539         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4540         if (i)
4541                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4542         i = atomic_read(&mdev->pp_in_use_by_net);
4543         if (i)
4544                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4545         i = atomic_read(&mdev->pp_in_use);
4546         if (i)
4547                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4548
4549         D_ASSERT(list_empty(&mdev->read_ee));
4550         D_ASSERT(list_empty(&mdev->active_ee));
4551         D_ASSERT(list_empty(&mdev->sync_ee));
4552         D_ASSERT(list_empty(&mdev->done_ee));
4553
4554         return 0;
4555 }
4556
4557 /*
4558  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4559  * we can agree on is stored in agreed_pro_version.
4560  *
4561  * feature flags and the reserved array should be enough room for future
4562  * enhancements of the handshake protocol, and possible plugins...
4563  *
4564  * for now, they are expected to be zero, but ignored.
4565  */
4566 static int drbd_send_features(struct drbd_tconn *tconn)
4567 {
4568         struct drbd_socket *sock;
4569         struct p_connection_features *p;
4570
4571         sock = &tconn->data;
4572         p = conn_prepare_command(tconn, sock);
4573         if (!p)
4574                 return -EIO;
4575         memset(p, 0, sizeof(*p));
4576         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4577         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4578         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4579 }
4580
4581 /*
4582  * return values:
4583  *   1 yes, we have a valid connection
4584  *   0 oops, did not work out, please try again
4585  *  -1 peer talks different language,
4586  *     no point in trying again, please go standalone.
4587  */
4588 static int drbd_do_features(struct drbd_tconn *tconn)
4589 {
4590         /* ASSERT current == tconn->receiver ... */
4591         struct p_connection_features *p;
4592         const int expect = sizeof(struct p_connection_features);
4593         struct packet_info pi;
4594         int err;
4595
4596         err = drbd_send_features(tconn);
4597         if (err)
4598                 return 0;
4599
4600         err = drbd_recv_header(tconn, &pi);
4601         if (err)
4602                 return 0;
4603
4604         if (pi.cmd != P_CONNECTION_FEATURES) {
4605                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4606                          cmdname(pi.cmd), pi.cmd);
4607                 return -1;
4608         }
4609
4610         if (pi.size != expect) {
4611                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4612                      expect, pi.size);
4613                 return -1;
4614         }
4615
4616         p = pi.data;
4617         err = drbd_recv_all_warn(tconn, p, expect);
4618         if (err)
4619                 return 0;
4620
4621         p->protocol_min = be32_to_cpu(p->protocol_min);
4622         p->protocol_max = be32_to_cpu(p->protocol_max);
4623         if (p->protocol_max == 0)
4624                 p->protocol_max = p->protocol_min;
4625
4626         if (PRO_VERSION_MAX < p->protocol_min ||
4627             PRO_VERSION_MIN > p->protocol_max)
4628                 goto incompat;
4629
4630         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4631
4632         conn_info(tconn, "Handshake successful: "
4633              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4634
4635         return 1;
4636
4637  incompat:
4638         conn_err(tconn, "incompatible DRBD dialects: "
4639             "I support %d-%d, peer supports %d-%d\n",
4640             PRO_VERSION_MIN, PRO_VERSION_MAX,
4641             p->protocol_min, p->protocol_max);
4642         return -1;
4643 }
4644
4645 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4646 static int drbd_do_auth(struct drbd_tconn *tconn)
4647 {
4648         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4649         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4650         return -1;
4651 }
4652 #else
4653 #define CHALLENGE_LEN 64
4654
4655 /* Return value:
4656         1 - auth succeeded,
4657         0 - failed, try again (network error),
4658         -1 - auth failed, don't try again.
4659 */
4660
4661 static int drbd_do_auth(struct drbd_tconn *tconn)
4662 {
4663         struct drbd_socket *sock;
4664         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4665         struct scatterlist sg;
4666         char *response = NULL;
4667         char *right_response = NULL;
4668         char *peers_ch = NULL;
4669         unsigned int key_len;
4670         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4671         unsigned int resp_size;
4672         struct hash_desc desc;
4673         struct packet_info pi;
4674         struct net_conf *nc;
4675         int err, rv;
4676
4677         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4678
4679         rcu_read_lock();
4680         nc = rcu_dereference(tconn->net_conf);
4681         key_len = strlen(nc->shared_secret);
4682         memcpy(secret, nc->shared_secret, key_len);
4683         rcu_read_unlock();
4684
4685         desc.tfm = tconn->cram_hmac_tfm;
4686         desc.flags = 0;
4687
4688         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4689         if (rv) {
4690                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4691                 rv = -1;
4692                 goto fail;
4693         }
4694
4695         get_random_bytes(my_challenge, CHALLENGE_LEN);
4696
4697         sock = &tconn->data;
4698         if (!conn_prepare_command(tconn, sock)) {
4699                 rv = 0;
4700                 goto fail;
4701         }
4702         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4703                                 my_challenge, CHALLENGE_LEN);
4704         if (!rv)
4705                 goto fail;
4706
4707         err = drbd_recv_header(tconn, &pi);
4708         if (err) {
4709                 rv = 0;
4710                 goto fail;
4711         }
4712
4713         if (pi.cmd != P_AUTH_CHALLENGE) {
4714                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4715                          cmdname(pi.cmd), pi.cmd);
4716                 rv = 0;
4717                 goto fail;
4718         }
4719
4720         if (pi.size > CHALLENGE_LEN * 2) {
4721                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4722                 rv = -1;
4723                 goto fail;
4724         }
4725
4726         peers_ch = kmalloc(pi.size, GFP_NOIO);
4727         if (peers_ch == NULL) {
4728                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4729                 rv = -1;
4730                 goto fail;
4731         }
4732
4733         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4734         if (err) {
4735                 rv = 0;
4736                 goto fail;
4737         }
4738
4739         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4740         response = kmalloc(resp_size, GFP_NOIO);
4741         if (response == NULL) {
4742                 conn_err(tconn, "kmalloc of response failed\n");
4743                 rv = -1;
4744                 goto fail;
4745         }
4746
4747         sg_init_table(&sg, 1);
4748         sg_set_buf(&sg, peers_ch, pi.size);
4749
4750         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4751         if (rv) {
4752                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4753                 rv = -1;
4754                 goto fail;
4755         }
4756
4757         if (!conn_prepare_command(tconn, sock)) {
4758                 rv = 0;
4759                 goto fail;
4760         }
4761         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4762                                 response, resp_size);
4763         if (!rv)
4764                 goto fail;
4765
4766         err = drbd_recv_header(tconn, &pi);
4767         if (err) {
4768                 rv = 0;
4769                 goto fail;
4770         }
4771
4772         if (pi.cmd != P_AUTH_RESPONSE) {
4773                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4774                          cmdname(pi.cmd), pi.cmd);
4775                 rv = 0;
4776                 goto fail;
4777         }
4778
4779         if (pi.size != resp_size) {
4780                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4781                 rv = 0;
4782                 goto fail;
4783         }
4784
4785         err = drbd_recv_all_warn(tconn, response , resp_size);
4786         if (err) {
4787                 rv = 0;
4788                 goto fail;
4789         }
4790
4791         right_response = kmalloc(resp_size, GFP_NOIO);
4792         if (right_response == NULL) {
4793                 conn_err(tconn, "kmalloc of right_response failed\n");
4794                 rv = -1;
4795                 goto fail;
4796         }
4797
4798         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4799
4800         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4801         if (rv) {
4802                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4803                 rv = -1;
4804                 goto fail;
4805         }
4806
4807         rv = !memcmp(response, right_response, resp_size);
4808
4809         if (rv)
4810                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4811                      resp_size);
4812         else
4813                 rv = -1;
4814
4815  fail:
4816         kfree(peers_ch);
4817         kfree(response);
4818         kfree(right_response);
4819
4820         return rv;
4821 }
4822 #endif
4823
4824 int drbdd_init(struct drbd_thread *thi)
4825 {
4826         struct drbd_tconn *tconn = thi->tconn;
4827         int h;
4828
4829         conn_info(tconn, "receiver (re)started\n");
4830
4831         do {
4832                 h = conn_connect(tconn);
4833                 if (h == 0) {
4834                         conn_disconnect(tconn);
4835                         schedule_timeout_interruptible(HZ);
4836                 }
4837                 if (h == -1) {
4838                         conn_warn(tconn, "Discarding network configuration.\n");
4839                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4840                 }
4841         } while (h == 0);
4842
4843         if (h > 0)
4844                 drbdd(tconn);
4845
4846         conn_disconnect(tconn);
4847
4848         conn_info(tconn, "receiver terminated\n");
4849         return 0;
4850 }
4851
4852 /* ********* acknowledge sender ******** */
4853
4854 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4855 {
4856         struct p_req_state_reply *p = pi->data;
4857         int retcode = be32_to_cpu(p->retcode);
4858
4859         if (retcode >= SS_SUCCESS) {
4860                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4861         } else {
4862                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4863                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4864                          drbd_set_st_err_str(retcode), retcode);
4865         }
4866         wake_up(&tconn->ping_wait);
4867
4868         return 0;
4869 }
4870
4871 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4872 {
4873         struct drbd_conf *mdev;
4874         struct p_req_state_reply *p = pi->data;
4875         int retcode = be32_to_cpu(p->retcode);
4876
4877         mdev = vnr_to_mdev(tconn, pi->vnr);
4878         if (!mdev)
4879                 return -EIO;
4880
4881         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4882                 D_ASSERT(tconn->agreed_pro_version < 100);
4883                 return got_conn_RqSReply(tconn, pi);
4884         }
4885
4886         if (retcode >= SS_SUCCESS) {
4887                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4888         } else {
4889                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4890                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4891                         drbd_set_st_err_str(retcode), retcode);
4892         }
4893         wake_up(&mdev->state_wait);
4894
4895         return 0;
4896 }
4897
4898 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4899 {
4900         return drbd_send_ping_ack(tconn);
4901
4902 }
4903
4904 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4905 {
4906         /* restore idle timeout */
4907         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4908         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4909                 wake_up(&tconn->ping_wait);
4910
4911         return 0;
4912 }
4913
4914 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4915 {
4916         struct drbd_conf *mdev;
4917         struct p_block_ack *p = pi->data;
4918         sector_t sector = be64_to_cpu(p->sector);
4919         int blksize = be32_to_cpu(p->blksize);
4920
4921         mdev = vnr_to_mdev(tconn, pi->vnr);
4922         if (!mdev)
4923                 return -EIO;
4924
4925         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4926
4927         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4928
4929         if (get_ldev(mdev)) {
4930                 drbd_rs_complete_io(mdev, sector);
4931                 drbd_set_in_sync(mdev, sector, blksize);
4932                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4933                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4934                 put_ldev(mdev);
4935         }
4936         dec_rs_pending(mdev);
4937         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4938
4939         return 0;
4940 }
4941
4942 static int
4943 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4944                               struct rb_root *root, const char *func,
4945                               enum drbd_req_event what, bool missing_ok)
4946 {
4947         struct drbd_request *req;
4948         struct bio_and_error m;
4949
4950         spin_lock_irq(&mdev->tconn->req_lock);
4951         req = find_request(mdev, root, id, sector, missing_ok, func);
4952         if (unlikely(!req)) {
4953                 spin_unlock_irq(&mdev->tconn->req_lock);
4954                 return -EIO;
4955         }
4956         __req_mod(req, what, &m);
4957         spin_unlock_irq(&mdev->tconn->req_lock);
4958
4959         if (m.bio)
4960                 complete_master_bio(mdev, &m);
4961         return 0;
4962 }
4963
4964 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4965 {
4966         struct drbd_conf *mdev;
4967         struct p_block_ack *p = pi->data;
4968         sector_t sector = be64_to_cpu(p->sector);
4969         int blksize = be32_to_cpu(p->blksize);
4970         enum drbd_req_event what;
4971
4972         mdev = vnr_to_mdev(tconn, pi->vnr);
4973         if (!mdev)
4974                 return -EIO;
4975
4976         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4977
4978         if (p->block_id == ID_SYNCER) {
4979                 drbd_set_in_sync(mdev, sector, blksize);
4980                 dec_rs_pending(mdev);
4981                 return 0;
4982         }
4983         switch (pi->cmd) {
4984         case P_RS_WRITE_ACK:
4985                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4986                 break;
4987         case P_WRITE_ACK:
4988                 what = WRITE_ACKED_BY_PEER;
4989                 break;
4990         case P_RECV_ACK:
4991                 what = RECV_ACKED_BY_PEER;
4992                 break;
4993         case P_DISCARD_WRITE:
4994                 what = DISCARD_WRITE;
4995                 break;
4996         case P_RETRY_WRITE:
4997                 what = POSTPONE_WRITE;
4998                 break;
4999         default:
5000                 BUG();
5001         }
5002
5003         return validate_req_change_req_state(mdev, p->block_id, sector,
5004                                              &mdev->write_requests, __func__,
5005                                              what, false);
5006 }
5007
5008 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
5009 {
5010         struct drbd_conf *mdev;
5011         struct p_block_ack *p = pi->data;
5012         sector_t sector = be64_to_cpu(p->sector);
5013         int size = be32_to_cpu(p->blksize);
5014         int err;
5015
5016         mdev = vnr_to_mdev(tconn, pi->vnr);
5017         if (!mdev)
5018                 return -EIO;
5019
5020         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5021
5022         if (p->block_id == ID_SYNCER) {
5023                 dec_rs_pending(mdev);
5024                 drbd_rs_failed_io(mdev, sector, size);
5025                 return 0;
5026         }
5027
5028         err = validate_req_change_req_state(mdev, p->block_id, sector,
5029                                             &mdev->write_requests, __func__,
5030                                             NEG_ACKED, true);
5031         if (err) {
5032                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5033                    The master bio might already be completed, therefore the
5034                    request is no longer in the collision hash. */
5035                 /* In Protocol B we might already have got a P_RECV_ACK
5036                    but then get a P_NEG_ACK afterwards. */
5037                 drbd_set_out_of_sync(mdev, sector, size);
5038         }
5039         return 0;
5040 }
5041
5042 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5043 {
5044         struct drbd_conf *mdev;
5045         struct p_block_ack *p = pi->data;
5046         sector_t sector = be64_to_cpu(p->sector);
5047
5048         mdev = vnr_to_mdev(tconn, pi->vnr);
5049         if (!mdev)
5050                 return -EIO;
5051
5052         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5053
5054         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5055             (unsigned long long)sector, be32_to_cpu(p->blksize));
5056
5057         return validate_req_change_req_state(mdev, p->block_id, sector,
5058                                              &mdev->read_requests, __func__,
5059                                              NEG_ACKED, false);
5060 }
5061
5062 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5063 {
5064         struct drbd_conf *mdev;
5065         sector_t sector;
5066         int size;
5067         struct p_block_ack *p = pi->data;
5068
5069         mdev = vnr_to_mdev(tconn, pi->vnr);
5070         if (!mdev)
5071                 return -EIO;
5072
5073         sector = be64_to_cpu(p->sector);
5074         size = be32_to_cpu(p->blksize);
5075
5076         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5077
5078         dec_rs_pending(mdev);
5079
5080         if (get_ldev_if_state(mdev, D_FAILED)) {
5081                 drbd_rs_complete_io(mdev, sector);
5082                 switch (pi->cmd) {
5083                 case P_NEG_RS_DREPLY:
5084                         drbd_rs_failed_io(mdev, sector, size);
5085                 case P_RS_CANCEL:
5086                         break;
5087                 default:
5088                         BUG();
5089                 }
5090                 put_ldev(mdev);
5091         }
5092
5093         return 0;
5094 }
5095
5096 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5097 {
5098         struct p_barrier_ack *p = pi->data;
5099         struct drbd_conf *mdev;
5100         int vnr;
5101
5102         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5103
5104         rcu_read_lock();
5105         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5106                 if (mdev->state.conn == C_AHEAD &&
5107                     atomic_read(&mdev->ap_in_flight) == 0 &&
5108                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5109                         mdev->start_resync_timer.expires = jiffies + HZ;
5110                         add_timer(&mdev->start_resync_timer);
5111                 }
5112         }
5113         rcu_read_unlock();
5114
5115         return 0;
5116 }
5117
5118 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5119 {
5120         struct drbd_conf *mdev;
5121         struct p_block_ack *p = pi->data;
5122         struct drbd_work *w;
5123         sector_t sector;
5124         int size;
5125
5126         mdev = vnr_to_mdev(tconn, pi->vnr);
5127         if (!mdev)
5128                 return -EIO;
5129
5130         sector = be64_to_cpu(p->sector);
5131         size = be32_to_cpu(p->blksize);
5132
5133         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5134
5135         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5136                 drbd_ov_out_of_sync_found(mdev, sector, size);
5137         else
5138                 ov_out_of_sync_print(mdev);
5139
5140         if (!get_ldev(mdev))
5141                 return 0;
5142
5143         drbd_rs_complete_io(mdev, sector);
5144         dec_rs_pending(mdev);
5145
5146         --mdev->ov_left;
5147
5148         /* let's advance progress step marks only for every other megabyte */
5149         if ((mdev->ov_left & 0x200) == 0x200)
5150                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5151
5152         if (mdev->ov_left == 0) {
5153                 w = kmalloc(sizeof(*w), GFP_NOIO);
5154                 if (w) {
5155                         w->cb = w_ov_finished;
5156                         w->mdev = mdev;
5157                         drbd_queue_work(&mdev->tconn->sender_work, w);
5158                 } else {
5159                         dev_err(DEV, "kmalloc(w) failed.");
5160                         ov_out_of_sync_print(mdev);
5161                         drbd_resync_finished(mdev);
5162                 }
5163         }
5164         put_ldev(mdev);
5165         return 0;
5166 }
5167
5168 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5169 {
5170         return 0;
5171 }
5172
5173 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5174 {
5175         struct drbd_conf *mdev;
5176         int vnr, not_empty = 0;
5177
5178         do {
5179                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5180                 flush_signals(current);
5181
5182                 rcu_read_lock();
5183                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5184                         kref_get(&mdev->kref);
5185                         rcu_read_unlock();
5186                         if (drbd_finish_peer_reqs(mdev)) {
5187                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5188                                 return 1;
5189                         }
5190                         kref_put(&mdev->kref, &drbd_minor_destroy);
5191                         rcu_read_lock();
5192                 }
5193                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5194
5195                 spin_lock_irq(&tconn->req_lock);
5196                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5197                         not_empty = !list_empty(&mdev->done_ee);
5198                         if (not_empty)
5199                                 break;
5200                 }
5201                 spin_unlock_irq(&tconn->req_lock);
5202                 rcu_read_unlock();
5203         } while (not_empty);
5204
5205         return 0;
5206 }
5207
5208 struct asender_cmd {
5209         size_t pkt_size;
5210         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5211 };
5212
5213 static struct asender_cmd asender_tbl[] = {
5214         [P_PING]            = { 0, got_Ping },
5215         [P_PING_ACK]        = { 0, got_PingAck },
5216         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5217         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5218         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5219         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5220         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5221         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5222         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5223         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5224         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5225         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5226         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5227         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5228         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5229         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5230         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5231 };
5232
5233 int drbd_asender(struct drbd_thread *thi)
5234 {
5235         struct drbd_tconn *tconn = thi->tconn;
5236         struct asender_cmd *cmd = NULL;
5237         struct packet_info pi;
5238         int rv;
5239         void *buf    = tconn->meta.rbuf;
5240         int received = 0;
5241         unsigned int header_size = drbd_header_size(tconn);
5242         int expect   = header_size;
5243         bool ping_timeout_active = false;
5244         struct net_conf *nc;
5245         int ping_timeo, tcp_cork, ping_int;
5246
5247         current->policy = SCHED_RR;  /* Make this a realtime task! */
5248         current->rt_priority = 2;    /* more important than all other tasks */
5249
5250         while (get_t_state(thi) == RUNNING) {
5251                 drbd_thread_current_set_cpu(thi);
5252
5253                 rcu_read_lock();
5254                 nc = rcu_dereference(tconn->net_conf);
5255                 ping_timeo = nc->ping_timeo;
5256                 tcp_cork = nc->tcp_cork;
5257                 ping_int = nc->ping_int;
5258                 rcu_read_unlock();
5259
5260                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5261                         if (drbd_send_ping(tconn)) {
5262                                 conn_err(tconn, "drbd_send_ping has failed\n");
5263                                 goto reconnect;
5264                         }
5265                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5266                         ping_timeout_active = true;
5267                 }
5268
5269                 /* TODO: conditionally cork; it may hurt latency if we cork without
5270                    much to send */
5271                 if (tcp_cork)
5272                         drbd_tcp_cork(tconn->meta.socket);
5273                 if (tconn_finish_peer_reqs(tconn)) {
5274                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5275                         goto reconnect;
5276                 }
5277                 /* but unconditionally uncork unless disabled */
5278                 if (tcp_cork)
5279                         drbd_tcp_uncork(tconn->meta.socket);
5280
5281                 /* short circuit, recv_msg would return EINTR anyways. */
5282                 if (signal_pending(current))
5283                         continue;
5284
5285                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5286                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5287
5288                 flush_signals(current);
5289
5290                 /* Note:
5291                  * -EINTR        (on meta) we got a signal
5292                  * -EAGAIN       (on meta) rcvtimeo expired
5293                  * -ECONNRESET   other side closed the connection
5294                  * -ERESTARTSYS  (on data) we got a signal
5295                  * rv <  0       other than above: unexpected error!
5296                  * rv == expected: full header or command
5297                  * rv <  expected: "woken" by signal during receive
5298                  * rv == 0       : "connection shut down by peer"
5299                  */
5300                 if (likely(rv > 0)) {
5301                         received += rv;
5302                         buf      += rv;
5303                 } else if (rv == 0) {
5304                         conn_err(tconn, "meta connection shut down by peer.\n");
5305                         goto reconnect;
5306                 } else if (rv == -EAGAIN) {
5307                         /* If the data socket received something meanwhile,
5308                          * that is good enough: peer is still alive. */
5309                         if (time_after(tconn->last_received,
5310                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5311                                 continue;
5312                         if (ping_timeout_active) {
5313                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5314                                 goto reconnect;
5315                         }
5316                         set_bit(SEND_PING, &tconn->flags);
5317                         continue;
5318                 } else if (rv == -EINTR) {
5319                         continue;
5320                 } else {
5321                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5322                         goto reconnect;
5323                 }
5324
5325                 if (received == expect && cmd == NULL) {
5326                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5327                                 goto reconnect;
5328                         cmd = &asender_tbl[pi.cmd];
5329                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5330                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5331                                          cmdname(pi.cmd), pi.cmd);
5332                                 goto disconnect;
5333                         }
5334                         expect = header_size + cmd->pkt_size;
5335                         if (pi.size != expect - header_size) {
5336                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5337                                         pi.cmd, pi.size);
5338                                 goto reconnect;
5339                         }
5340                 }
5341                 if (received == expect) {
5342                         bool err;
5343
5344                         err = cmd->fn(tconn, &pi);
5345                         if (err) {
5346                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5347                                 goto reconnect;
5348                         }
5349
5350                         tconn->last_received = jiffies;
5351
5352                         if (cmd == &asender_tbl[P_PING_ACK]) {
5353                                 /* restore idle timeout */
5354                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5355                                 ping_timeout_active = false;
5356                         }
5357
5358                         buf      = tconn->meta.rbuf;
5359                         received = 0;
5360                         expect   = header_size;
5361                         cmd      = NULL;
5362                 }
5363         }
5364
5365         if (0) {
5366 reconnect:
5367                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5368         }
5369         if (0) {
5370 disconnect:
5371                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5372         }
5373         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5374
5375         conn_info(tconn, "asender terminated\n");
5376
5377         return 0;
5378 }