drbd: Don't unregister socket state_change callback from within the callback
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_peer_request *peer_req;
337         struct page *page = NULL;
338         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
339
340         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
341                 return NULL;
342
343         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
344         if (!peer_req) {
345                 if (!(gfp_mask & __GFP_NOWARN))
346                         dev_err(DEV, "%s: allocation failed\n", __func__);
347                 return NULL;
348         }
349
350         if (data_size) {
351                 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
352                 if (!page)
353                         goto fail;
354         }
355
356         drbd_clear_interval(&peer_req->i);
357         peer_req->i.size = data_size;
358         peer_req->i.sector = sector;
359         peer_req->i.local = false;
360         peer_req->i.waiting = false;
361
362         peer_req->epoch = NULL;
363         peer_req->w.mdev = mdev;
364         peer_req->pages = page;
365         atomic_set(&peer_req->pending_bios, 0);
366         peer_req->flags = 0;
367         /*
368          * The block_id is opaque to the receiver.  It is not endianness
369          * converted, and sent back to the sender unchanged.
370          */
371         peer_req->block_id = id;
372
373         return peer_req;
374
375  fail:
376         mempool_free(peer_req, drbd_ee_mempool);
377         return NULL;
378 }
379
380 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
381                        int is_net)
382 {
383         if (peer_req->flags & EE_HAS_DIGEST)
384                 kfree(peer_req->digest);
385         drbd_free_pages(mdev, peer_req->pages, is_net);
386         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
387         D_ASSERT(drbd_interval_empty(&peer_req->i));
388         mempool_free(peer_req, drbd_ee_mempool);
389 }
390
391 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
392 {
393         LIST_HEAD(work_list);
394         struct drbd_peer_request *peer_req, *t;
395         int count = 0;
396         int is_net = list == &mdev->net_ee;
397
398         spin_lock_irq(&mdev->tconn->req_lock);
399         list_splice_init(list, &work_list);
400         spin_unlock_irq(&mdev->tconn->req_lock);
401
402         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
403                 __drbd_free_peer_req(mdev, peer_req, is_net);
404                 count++;
405         }
406         return count;
407 }
408
409 /*
410  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
411  */
412 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
413 {
414         LIST_HEAD(work_list);
415         LIST_HEAD(reclaimed);
416         struct drbd_peer_request *peer_req, *t;
417         int err = 0;
418
419         spin_lock_irq(&mdev->tconn->req_lock);
420         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
421         list_splice_init(&mdev->done_ee, &work_list);
422         spin_unlock_irq(&mdev->tconn->req_lock);
423
424         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
425                 drbd_free_net_peer_req(mdev, peer_req);
426
427         /* possible callbacks here:
428          * e_end_block, and e_end_resync_block, e_send_superseded.
429          * all ignore the last argument.
430          */
431         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432                 int err2;
433
434                 /* list_del not necessary, next/prev members not touched */
435                 err2 = peer_req->w.cb(&peer_req->w, !!err);
436                 if (!err)
437                         err = err2;
438                 drbd_free_peer_req(mdev, peer_req);
439         }
440         wake_up(&mdev->ee_wait);
441
442         return err;
443 }
444
445 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
446                                      struct list_head *head)
447 {
448         DEFINE_WAIT(wait);
449
450         /* avoids spin_lock/unlock
451          * and calling prepare_to_wait in the fast path */
452         while (!list_empty(head)) {
453                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
454                 spin_unlock_irq(&mdev->tconn->req_lock);
455                 io_schedule();
456                 finish_wait(&mdev->ee_wait, &wait);
457                 spin_lock_irq(&mdev->tconn->req_lock);
458         }
459 }
460
461 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
462                                     struct list_head *head)
463 {
464         spin_lock_irq(&mdev->tconn->req_lock);
465         _drbd_wait_ee_list_empty(mdev, head);
466         spin_unlock_irq(&mdev->tconn->req_lock);
467 }
468
469 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
470 {
471         mm_segment_t oldfs;
472         struct kvec iov = {
473                 .iov_base = buf,
474                 .iov_len = size,
475         };
476         struct msghdr msg = {
477                 .msg_iovlen = 1,
478                 .msg_iov = (struct iovec *)&iov,
479                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
480         };
481         int rv;
482
483         oldfs = get_fs();
484         set_fs(KERNEL_DS);
485         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
486         set_fs(oldfs);
487
488         return rv;
489 }
490
491 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
492 {
493         mm_segment_t oldfs;
494         struct kvec iov = {
495                 .iov_base = buf,
496                 .iov_len = size,
497         };
498         struct msghdr msg = {
499                 .msg_iovlen = 1,
500                 .msg_iov = (struct iovec *)&iov,
501                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
502         };
503         int rv;
504
505         oldfs = get_fs();
506         set_fs(KERNEL_DS);
507
508         for (;;) {
509                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
510                 if (rv == size)
511                         break;
512
513                 /* Note:
514                  * ECONNRESET   other side closed the connection
515                  * ERESTARTSYS  (on  sock) we got a signal
516                  */
517
518                 if (rv < 0) {
519                         if (rv == -ECONNRESET)
520                                 conn_info(tconn, "sock was reset by peer\n");
521                         else if (rv != -ERESTARTSYS)
522                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
523                         break;
524                 } else if (rv == 0) {
525                         conn_info(tconn, "sock was shut down by peer\n");
526                         break;
527                 } else  {
528                         /* signal came in, or peer/link went down,
529                          * after we read a partial message
530                          */
531                         /* D_ASSERT(signal_pending(current)); */
532                         break;
533                 }
534         };
535
536         set_fs(oldfs);
537
538         if (rv != size)
539                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
540
541         return rv;
542 }
543
544 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
545 {
546         int err;
547
548         err = drbd_recv(tconn, buf, size);
549         if (err != size) {
550                 if (err >= 0)
551                         err = -EIO;
552         } else
553                 err = 0;
554         return err;
555 }
556
557 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
558 {
559         int err;
560
561         err = drbd_recv_all(tconn, buf, size);
562         if (err && !signal_pending(current))
563                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
564         return err;
565 }
566
567 /* quoting tcp(7):
568  *   On individual connections, the socket buffer size must be set prior to the
569  *   listen(2) or connect(2) calls in order to have it take effect.
570  * This is our wrapper to do so.
571  */
572 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
573                 unsigned int rcv)
574 {
575         /* open coded SO_SNDBUF, SO_RCVBUF */
576         if (snd) {
577                 sock->sk->sk_sndbuf = snd;
578                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
579         }
580         if (rcv) {
581                 sock->sk->sk_rcvbuf = rcv;
582                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
583         }
584 }
585
586 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
587 {
588         const char *what;
589         struct socket *sock;
590         struct sockaddr_in6 src_in6;
591         struct sockaddr_in6 peer_in6;
592         struct net_conf *nc;
593         int err, peer_addr_len, my_addr_len;
594         int sndbuf_size, rcvbuf_size, connect_int;
595         int disconnect_on_error = 1;
596
597         rcu_read_lock();
598         nc = rcu_dereference(tconn->net_conf);
599         if (!nc) {
600                 rcu_read_unlock();
601                 return NULL;
602         }
603         sndbuf_size = nc->sndbuf_size;
604         rcvbuf_size = nc->rcvbuf_size;
605         connect_int = nc->connect_int;
606         rcu_read_unlock();
607
608         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
609         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
610
611         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
612                 src_in6.sin6_port = 0;
613         else
614                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
615
616         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
617         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
618
619         what = "sock_create_kern";
620         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
621                                SOCK_STREAM, IPPROTO_TCP, &sock);
622         if (err < 0) {
623                 sock = NULL;
624                 goto out;
625         }
626
627         sock->sk->sk_rcvtimeo =
628         sock->sk->sk_sndtimeo = connect_int * HZ;
629         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
630
631        /* explicitly bind to the configured IP as source IP
632         *  for the outgoing connections.
633         *  This is needed for multihomed hosts and to be
634         *  able to use lo: interfaces for drbd.
635         * Make sure to use 0 as port number, so linux selects
636         *  a free one dynamically.
637         */
638         what = "bind before connect";
639         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
640         if (err < 0)
641                 goto out;
642
643         /* connect may fail, peer not yet available.
644          * stay C_WF_CONNECTION, don't go Disconnecting! */
645         disconnect_on_error = 0;
646         what = "connect";
647         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
648
649 out:
650         if (err < 0) {
651                 if (sock) {
652                         sock_release(sock);
653                         sock = NULL;
654                 }
655                 switch (-err) {
656                         /* timeout, busy, signal pending */
657                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
658                 case EINTR: case ERESTARTSYS:
659                         /* peer not (yet) available, network problem */
660                 case ECONNREFUSED: case ENETUNREACH:
661                 case EHOSTDOWN:    case EHOSTUNREACH:
662                         disconnect_on_error = 0;
663                         break;
664                 default:
665                         conn_err(tconn, "%s failed, err = %d\n", what, err);
666                 }
667                 if (disconnect_on_error)
668                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
669         }
670
671         return sock;
672 }
673
674 struct accept_wait_data {
675         struct drbd_tconn *tconn;
676         struct socket *s_listen;
677         struct completion door_bell;
678         void (*original_sk_state_change)(struct sock *sk);
679
680 };
681
682 static void drbd_incoming_connection(struct sock *sk)
683 {
684         struct accept_wait_data *ad = sk->sk_user_data;
685         void (*state_change)(struct sock *sk);
686
687         state_change = ad->original_sk_state_change;
688         if (sk->sk_state == TCP_ESTABLISHED)
689                 complete(&ad->door_bell);
690         state_change(sk);
691 }
692
693 static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
694 {
695         int err, sndbuf_size, rcvbuf_size, my_addr_len;
696         struct sockaddr_in6 my_addr;
697         struct socket *s_listen;
698         struct net_conf *nc;
699         const char *what;
700
701         rcu_read_lock();
702         nc = rcu_dereference(tconn->net_conf);
703         if (!nc) {
704                 rcu_read_unlock();
705                 return -EIO;
706         }
707         sndbuf_size = nc->sndbuf_size;
708         rcvbuf_size = nc->rcvbuf_size;
709         rcu_read_unlock();
710
711         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
712         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
713
714         what = "sock_create_kern";
715         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
716                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
717         if (err) {
718                 s_listen = NULL;
719                 goto out;
720         }
721
722         s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
723         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
724
725         what = "bind before listen";
726         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
727         if (err < 0)
728                 goto out;
729
730         ad->s_listen = s_listen;
731         write_lock_bh(&s_listen->sk->sk_callback_lock);
732         ad->original_sk_state_change = s_listen->sk->sk_state_change;
733         s_listen->sk->sk_state_change = drbd_incoming_connection;
734         s_listen->sk->sk_user_data = ad;
735         write_unlock_bh(&s_listen->sk->sk_callback_lock);
736
737         what = "listen";
738         err = s_listen->ops->listen(s_listen, 5);
739         if (err < 0)
740                 goto out;
741
742         return 0;
743 out:
744         if (s_listen)
745                 sock_release(s_listen);
746         if (err < 0) {
747                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
748                         conn_err(tconn, "%s failed, err = %d\n", what, err);
749                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
750                 }
751         }
752
753         return -EIO;
754 }
755
756 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
757 {
758         write_lock_bh(&sk->sk_callback_lock);
759         sk->sk_state_change = ad->original_sk_state_change;
760         sk->sk_user_data = NULL;
761         write_unlock_bh(&sk->sk_callback_lock);
762 }
763
764 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
765 {
766         int timeo, connect_int, err = 0;
767         struct socket *s_estab = NULL;
768         struct net_conf *nc;
769
770         rcu_read_lock();
771         nc = rcu_dereference(tconn->net_conf);
772         if (!nc) {
773                 rcu_read_unlock();
774                 return NULL;
775         }
776         connect_int = nc->connect_int;
777         rcu_read_unlock();
778
779         timeo = connect_int * HZ;
780         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
781
782         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
783         if (err <= 0)
784                 return NULL;
785
786         err = kernel_accept(ad->s_listen, &s_estab, 0);
787         if (err < 0) {
788                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
789                         conn_err(tconn, "accept failed, err = %d\n", err);
790                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
791                 }
792         }
793
794         if (s_estab)
795                 unregister_state_change(s_estab->sk, ad);
796
797         return s_estab;
798 }
799
800 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
801
802 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
803                              enum drbd_packet cmd)
804 {
805         if (!conn_prepare_command(tconn, sock))
806                 return -EIO;
807         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
808 }
809
810 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
811 {
812         unsigned int header_size = drbd_header_size(tconn);
813         struct packet_info pi;
814         int err;
815
816         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
817         if (err != header_size) {
818                 if (err >= 0)
819                         err = -EIO;
820                 return err;
821         }
822         err = decode_header(tconn, tconn->data.rbuf, &pi);
823         if (err)
824                 return err;
825         return pi.cmd;
826 }
827
828 /**
829  * drbd_socket_okay() - Free the socket if its connection is not okay
830  * @sock:       pointer to the pointer to the socket.
831  */
832 static int drbd_socket_okay(struct socket **sock)
833 {
834         int rr;
835         char tb[4];
836
837         if (!*sock)
838                 return false;
839
840         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
841
842         if (rr > 0 || rr == -EAGAIN) {
843                 return true;
844         } else {
845                 sock_release(*sock);
846                 *sock = NULL;
847                 return false;
848         }
849 }
850 /* Gets called if a connection is established, or if a new minor gets created
851    in a connection */
852 int drbd_connected(struct drbd_conf *mdev)
853 {
854         int err;
855
856         atomic_set(&mdev->packet_seq, 0);
857         mdev->peer_seq = 0;
858
859         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
860                 &mdev->tconn->cstate_mutex :
861                 &mdev->own_state_mutex;
862
863         err = drbd_send_sync_param(mdev);
864         if (!err)
865                 err = drbd_send_sizes(mdev, 0, 0);
866         if (!err)
867                 err = drbd_send_uuids(mdev);
868         if (!err)
869                 err = drbd_send_current_state(mdev);
870         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
871         clear_bit(RESIZE_PENDING, &mdev->flags);
872         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
873         return err;
874 }
875
876 /*
877  * return values:
878  *   1 yes, we have a valid connection
879  *   0 oops, did not work out, please try again
880  *  -1 peer talks different language,
881  *     no point in trying again, please go standalone.
882  *  -2 We do not have a network config...
883  */
884 static int conn_connect(struct drbd_tconn *tconn)
885 {
886         struct drbd_socket sock, msock;
887         struct drbd_conf *mdev;
888         struct net_conf *nc;
889         int vnr, timeout, h, ok;
890         bool discard_my_data;
891         enum drbd_state_rv rv;
892         struct accept_wait_data ad = {
893                 .tconn = tconn,
894                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
895         };
896
897         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
898                 return -2;
899
900         mutex_init(&sock.mutex);
901         sock.sbuf = tconn->data.sbuf;
902         sock.rbuf = tconn->data.rbuf;
903         sock.socket = NULL;
904         mutex_init(&msock.mutex);
905         msock.sbuf = tconn->meta.sbuf;
906         msock.rbuf = tconn->meta.rbuf;
907         msock.socket = NULL;
908
909         /* Assume that the peer only understands protocol 80 until we know better.  */
910         tconn->agreed_pro_version = 80;
911
912         if (prepare_listen_socket(tconn, &ad))
913                 return 0;
914
915         do {
916                 struct socket *s;
917
918                 s = drbd_try_connect(tconn);
919                 if (s) {
920                         if (!sock.socket) {
921                                 sock.socket = s;
922                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
923                         } else if (!msock.socket) {
924                                 clear_bit(RESOLVE_CONFLICTS, &tconn->flags);
925                                 msock.socket = s;
926                                 send_first_packet(tconn, &msock, P_INITIAL_META);
927                         } else {
928                                 conn_err(tconn, "Logic error in conn_connect()\n");
929                                 goto out_release_sockets;
930                         }
931                 }
932
933                 if (sock.socket && msock.socket) {
934                         rcu_read_lock();
935                         nc = rcu_dereference(tconn->net_conf);
936                         timeout = nc->ping_timeo * HZ / 10;
937                         rcu_read_unlock();
938                         schedule_timeout_interruptible(timeout);
939                         ok = drbd_socket_okay(&sock.socket);
940                         ok = drbd_socket_okay(&msock.socket) && ok;
941                         if (ok)
942                                 break;
943                 }
944
945 retry:
946                 s = drbd_wait_for_connect(tconn, &ad);
947                 if (s) {
948                         int fp = receive_first_packet(tconn, s);
949                         drbd_socket_okay(&sock.socket);
950                         drbd_socket_okay(&msock.socket);
951                         switch (fp) {
952                         case P_INITIAL_DATA:
953                                 if (sock.socket) {
954                                         conn_warn(tconn, "initial packet S crossed\n");
955                                         sock_release(sock.socket);
956                                         sock.socket = s;
957                                         goto randomize;
958                                 }
959                                 sock.socket = s;
960                                 break;
961                         case P_INITIAL_META:
962                                 set_bit(RESOLVE_CONFLICTS, &tconn->flags);
963                                 if (msock.socket) {
964                                         conn_warn(tconn, "initial packet M crossed\n");
965                                         sock_release(msock.socket);
966                                         msock.socket = s;
967                                         goto randomize;
968                                 }
969                                 msock.socket = s;
970                                 break;
971                         default:
972                                 conn_warn(tconn, "Error receiving initial packet\n");
973                                 sock_release(s);
974 randomize:
975                                 if (random32() & 1)
976                                         goto retry;
977                         }
978                 }
979
980                 if (tconn->cstate <= C_DISCONNECTING)
981                         goto out_release_sockets;
982                 if (signal_pending(current)) {
983                         flush_signals(current);
984                         smp_rmb();
985                         if (get_t_state(&tconn->receiver) == EXITING)
986                                 goto out_release_sockets;
987                 }
988
989                 ok = drbd_socket_okay(&sock.socket);
990                 ok = drbd_socket_okay(&msock.socket) && ok;
991         } while (!ok);
992
993         if (ad.s_listen)
994                 sock_release(ad.s_listen);
995
996         sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
997         msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
998
999         sock.socket->sk->sk_allocation = GFP_NOIO;
1000         msock.socket->sk->sk_allocation = GFP_NOIO;
1001
1002         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1003         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1004
1005         /* NOT YET ...
1006          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
1007          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1008          * first set it to the P_CONNECTION_FEATURES timeout,
1009          * which we set to 4x the configured ping_timeout. */
1010         rcu_read_lock();
1011         nc = rcu_dereference(tconn->net_conf);
1012
1013         sock.socket->sk->sk_sndtimeo =
1014         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1015
1016         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1017         timeout = nc->timeout * HZ / 10;
1018         discard_my_data = nc->discard_my_data;
1019         rcu_read_unlock();
1020
1021         msock.socket->sk->sk_sndtimeo = timeout;
1022
1023         /* we don't want delays.
1024          * we use TCP_CORK where appropriate, though */
1025         drbd_tcp_nodelay(sock.socket);
1026         drbd_tcp_nodelay(msock.socket);
1027
1028         tconn->data.socket = sock.socket;
1029         tconn->meta.socket = msock.socket;
1030         tconn->last_received = jiffies;
1031
1032         h = drbd_do_features(tconn);
1033         if (h <= 0)
1034                 return h;
1035
1036         if (tconn->cram_hmac_tfm) {
1037                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
1038                 switch (drbd_do_auth(tconn)) {
1039                 case -1:
1040                         conn_err(tconn, "Authentication of peer failed\n");
1041                         return -1;
1042                 case 0:
1043                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1044                         return 0;
1045                 }
1046         }
1047
1048         tconn->data.socket->sk->sk_sndtimeo = timeout;
1049         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1050
1051         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1052                 return -1;
1053
1054         set_bit(STATE_SENT, &tconn->flags);
1055
1056         rcu_read_lock();
1057         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1058                 kref_get(&mdev->kref);
1059                 rcu_read_unlock();
1060
1061                 if (discard_my_data)
1062                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1063                 else
1064                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1065
1066                 drbd_connected(mdev);
1067                 kref_put(&mdev->kref, &drbd_minor_destroy);
1068                 rcu_read_lock();
1069         }
1070         rcu_read_unlock();
1071
1072         rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1073         if (rv < SS_SUCCESS) {
1074                 clear_bit(STATE_SENT, &tconn->flags);
1075                 return 0;
1076         }
1077
1078         drbd_thread_start(&tconn->asender);
1079
1080         mutex_lock(&tconn->conf_update);
1081         /* The discard_my_data flag is a single-shot modifier to the next
1082          * connection attempt, the handshake of which is now well underway.
1083          * No need for rcu style copying of the whole struct
1084          * just to clear a single value. */
1085         tconn->net_conf->discard_my_data = 0;
1086         mutex_unlock(&tconn->conf_update);
1087
1088         return h;
1089
1090 out_release_sockets:
1091         if (ad.s_listen)
1092                 sock_release(ad.s_listen);
1093         if (sock.socket)
1094                 sock_release(sock.socket);
1095         if (msock.socket)
1096                 sock_release(msock.socket);
1097         return -1;
1098 }
1099
1100 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1101 {
1102         unsigned int header_size = drbd_header_size(tconn);
1103
1104         if (header_size == sizeof(struct p_header100) &&
1105             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1106                 struct p_header100 *h = header;
1107                 if (h->pad != 0) {
1108                         conn_err(tconn, "Header padding is not zero\n");
1109                         return -EINVAL;
1110                 }
1111                 pi->vnr = be16_to_cpu(h->volume);
1112                 pi->cmd = be16_to_cpu(h->command);
1113                 pi->size = be32_to_cpu(h->length);
1114         } else if (header_size == sizeof(struct p_header95) &&
1115                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1116                 struct p_header95 *h = header;
1117                 pi->cmd = be16_to_cpu(h->command);
1118                 pi->size = be32_to_cpu(h->length);
1119                 pi->vnr = 0;
1120         } else if (header_size == sizeof(struct p_header80) &&
1121                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1122                 struct p_header80 *h = header;
1123                 pi->cmd = be16_to_cpu(h->command);
1124                 pi->size = be16_to_cpu(h->length);
1125                 pi->vnr = 0;
1126         } else {
1127                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1128                          be32_to_cpu(*(__be32 *)header),
1129                          tconn->agreed_pro_version);
1130                 return -EINVAL;
1131         }
1132         pi->data = header + header_size;
1133         return 0;
1134 }
1135
1136 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1137 {
1138         void *buffer = tconn->data.rbuf;
1139         int err;
1140
1141         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1142         if (err)
1143                 return err;
1144
1145         err = decode_header(tconn, buffer, pi);
1146         tconn->last_received = jiffies;
1147
1148         return err;
1149 }
1150
1151 static void drbd_flush(struct drbd_tconn *tconn)
1152 {
1153         int rv;
1154         struct drbd_conf *mdev;
1155         int vnr;
1156
1157         if (tconn->write_ordering >= WO_bdev_flush) {
1158                 rcu_read_lock();
1159                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1160                         if (!get_ldev(mdev))
1161                                 continue;
1162                         kref_get(&mdev->kref);
1163                         rcu_read_unlock();
1164
1165                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1166                                         GFP_NOIO, NULL);
1167                         if (rv) {
1168                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1169                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1170                                  * don't try again for ANY return value != 0
1171                                  * if (rv == -EOPNOTSUPP) */
1172                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1173                         }
1174                         put_ldev(mdev);
1175                         kref_put(&mdev->kref, &drbd_minor_destroy);
1176
1177                         rcu_read_lock();
1178                         if (rv)
1179                                 break;
1180                 }
1181                 rcu_read_unlock();
1182         }
1183 }
1184
1185 /**
1186  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1187  * @mdev:       DRBD device.
1188  * @epoch:      Epoch object.
1189  * @ev:         Epoch event.
1190  */
1191 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1192                                                struct drbd_epoch *epoch,
1193                                                enum epoch_event ev)
1194 {
1195         int epoch_size;
1196         struct drbd_epoch *next_epoch;
1197         enum finish_epoch rv = FE_STILL_LIVE;
1198
1199         spin_lock(&tconn->epoch_lock);
1200         do {
1201                 next_epoch = NULL;
1202
1203                 epoch_size = atomic_read(&epoch->epoch_size);
1204
1205                 switch (ev & ~EV_CLEANUP) {
1206                 case EV_PUT:
1207                         atomic_dec(&epoch->active);
1208                         break;
1209                 case EV_GOT_BARRIER_NR:
1210                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1211                         break;
1212                 case EV_BECAME_LAST:
1213                         /* nothing to do*/
1214                         break;
1215                 }
1216
1217                 if (epoch_size != 0 &&
1218                     atomic_read(&epoch->active) == 0 &&
1219                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1220                         if (!(ev & EV_CLEANUP)) {
1221                                 spin_unlock(&tconn->epoch_lock);
1222                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1223                                 spin_lock(&tconn->epoch_lock);
1224                         }
1225 #if 0
1226                         /* FIXME: dec unacked on connection, once we have
1227                          * something to count pending connection packets in. */
1228                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1229                                 dec_unacked(epoch->tconn);
1230 #endif
1231
1232                         if (tconn->current_epoch != epoch) {
1233                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1234                                 list_del(&epoch->list);
1235                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1236                                 tconn->epochs--;
1237                                 kfree(epoch);
1238
1239                                 if (rv == FE_STILL_LIVE)
1240                                         rv = FE_DESTROYED;
1241                         } else {
1242                                 epoch->flags = 0;
1243                                 atomic_set(&epoch->epoch_size, 0);
1244                                 /* atomic_set(&epoch->active, 0); is already zero */
1245                                 if (rv == FE_STILL_LIVE)
1246                                         rv = FE_RECYCLED;
1247                         }
1248                 }
1249
1250                 if (!next_epoch)
1251                         break;
1252
1253                 epoch = next_epoch;
1254         } while (1);
1255
1256         spin_unlock(&tconn->epoch_lock);
1257
1258         return rv;
1259 }
1260
1261 /**
1262  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1263  * @tconn:      DRBD connection.
1264  * @wo:         Write ordering method to try.
1265  */
1266 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1267 {
1268         struct disk_conf *dc;
1269         struct drbd_conf *mdev;
1270         enum write_ordering_e pwo;
1271         int vnr;
1272         static char *write_ordering_str[] = {
1273                 [WO_none] = "none",
1274                 [WO_drain_io] = "drain",
1275                 [WO_bdev_flush] = "flush",
1276         };
1277
1278         pwo = tconn->write_ordering;
1279         wo = min(pwo, wo);
1280         rcu_read_lock();
1281         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1282                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1283                         continue;
1284                 dc = rcu_dereference(mdev->ldev->disk_conf);
1285
1286                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1287                         wo = WO_drain_io;
1288                 if (wo == WO_drain_io && !dc->disk_drain)
1289                         wo = WO_none;
1290                 put_ldev(mdev);
1291         }
1292         rcu_read_unlock();
1293         tconn->write_ordering = wo;
1294         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1295                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1296 }
1297
1298 /**
1299  * drbd_submit_peer_request()
1300  * @mdev:       DRBD device.
1301  * @peer_req:   peer request
1302  * @rw:         flag field, see bio->bi_rw
1303  *
1304  * May spread the pages to multiple bios,
1305  * depending on bio_add_page restrictions.
1306  *
1307  * Returns 0 if all bios have been submitted,
1308  * -ENOMEM if we could not allocate enough bios,
1309  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1310  *  single page to an empty bio (which should never happen and likely indicates
1311  *  that the lower level IO stack is in some way broken). This has been observed
1312  *  on certain Xen deployments.
1313  */
1314 /* TODO allocate from our own bio_set. */
1315 int drbd_submit_peer_request(struct drbd_conf *mdev,
1316                              struct drbd_peer_request *peer_req,
1317                              const unsigned rw, const int fault_type)
1318 {
1319         struct bio *bios = NULL;
1320         struct bio *bio;
1321         struct page *page = peer_req->pages;
1322         sector_t sector = peer_req->i.sector;
1323         unsigned ds = peer_req->i.size;
1324         unsigned n_bios = 0;
1325         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1326         int err = -ENOMEM;
1327
1328         /* In most cases, we will only need one bio.  But in case the lower
1329          * level restrictions happen to be different at this offset on this
1330          * side than those of the sending peer, we may need to submit the
1331          * request in more than one bio.
1332          *
1333          * Plain bio_alloc is good enough here, this is no DRBD internally
1334          * generated bio, but a bio allocated on behalf of the peer.
1335          */
1336 next_bio:
1337         bio = bio_alloc(GFP_NOIO, nr_pages);
1338         if (!bio) {
1339                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1340                 goto fail;
1341         }
1342         /* > peer_req->i.sector, unless this is the first bio */
1343         bio->bi_sector = sector;
1344         bio->bi_bdev = mdev->ldev->backing_bdev;
1345         bio->bi_rw = rw;
1346         bio->bi_private = peer_req;
1347         bio->bi_end_io = drbd_peer_request_endio;
1348
1349         bio->bi_next = bios;
1350         bios = bio;
1351         ++n_bios;
1352
1353         page_chain_for_each(page) {
1354                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1355                 if (!bio_add_page(bio, page, len, 0)) {
1356                         /* A single page must always be possible!
1357                          * But in case it fails anyways,
1358                          * we deal with it, and complain (below). */
1359                         if (bio->bi_vcnt == 0) {
1360                                 dev_err(DEV,
1361                                         "bio_add_page failed for len=%u, "
1362                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1363                                         len, (unsigned long long)bio->bi_sector);
1364                                 err = -ENOSPC;
1365                                 goto fail;
1366                         }
1367                         goto next_bio;
1368                 }
1369                 ds -= len;
1370                 sector += len >> 9;
1371                 --nr_pages;
1372         }
1373         D_ASSERT(page == NULL);
1374         D_ASSERT(ds == 0);
1375
1376         atomic_set(&peer_req->pending_bios, n_bios);
1377         do {
1378                 bio = bios;
1379                 bios = bios->bi_next;
1380                 bio->bi_next = NULL;
1381
1382                 drbd_generic_make_request(mdev, fault_type, bio);
1383         } while (bios);
1384         return 0;
1385
1386 fail:
1387         while (bios) {
1388                 bio = bios;
1389                 bios = bios->bi_next;
1390                 bio_put(bio);
1391         }
1392         return err;
1393 }
1394
1395 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1396                                              struct drbd_peer_request *peer_req)
1397 {
1398         struct drbd_interval *i = &peer_req->i;
1399
1400         drbd_remove_interval(&mdev->write_requests, i);
1401         drbd_clear_interval(i);
1402
1403         /* Wake up any processes waiting for this peer request to complete.  */
1404         if (i->waiting)
1405                 wake_up(&mdev->misc_wait);
1406 }
1407
1408 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1409 {
1410         struct drbd_conf *mdev;
1411         int vnr;
1412
1413         rcu_read_lock();
1414         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1415                 kref_get(&mdev->kref);
1416                 rcu_read_unlock();
1417                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1418                 kref_put(&mdev->kref, &drbd_minor_destroy);
1419                 rcu_read_lock();
1420         }
1421         rcu_read_unlock();
1422 }
1423
1424 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1425 {
1426         int rv;
1427         struct p_barrier *p = pi->data;
1428         struct drbd_epoch *epoch;
1429
1430         /* FIXME these are unacked on connection,
1431          * not a specific (peer)device.
1432          */
1433         tconn->current_epoch->barrier_nr = p->barrier;
1434         tconn->current_epoch->tconn = tconn;
1435         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1436
1437         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1438          * the activity log, which means it would not be resynced in case the
1439          * R_PRIMARY crashes now.
1440          * Therefore we must send the barrier_ack after the barrier request was
1441          * completed. */
1442         switch (tconn->write_ordering) {
1443         case WO_none:
1444                 if (rv == FE_RECYCLED)
1445                         return 0;
1446
1447                 /* receiver context, in the writeout path of the other node.
1448                  * avoid potential distributed deadlock */
1449                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1450                 if (epoch)
1451                         break;
1452                 else
1453                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1454                         /* Fall through */
1455
1456         case WO_bdev_flush:
1457         case WO_drain_io:
1458                 conn_wait_active_ee_empty(tconn);
1459                 drbd_flush(tconn);
1460
1461                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1462                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1463                         if (epoch)
1464                                 break;
1465                 }
1466
1467                 return 0;
1468         default:
1469                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1470                 return -EIO;
1471         }
1472
1473         epoch->flags = 0;
1474         atomic_set(&epoch->epoch_size, 0);
1475         atomic_set(&epoch->active, 0);
1476
1477         spin_lock(&tconn->epoch_lock);
1478         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1479                 list_add(&epoch->list, &tconn->current_epoch->list);
1480                 tconn->current_epoch = epoch;
1481                 tconn->epochs++;
1482         } else {
1483                 /* The current_epoch got recycled while we allocated this one... */
1484                 kfree(epoch);
1485         }
1486         spin_unlock(&tconn->epoch_lock);
1487
1488         return 0;
1489 }
1490
1491 /* used from receive_RSDataReply (recv_resync_read)
1492  * and from receive_Data */
1493 static struct drbd_peer_request *
1494 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1495               int data_size) __must_hold(local)
1496 {
1497         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1498         struct drbd_peer_request *peer_req;
1499         struct page *page;
1500         int dgs, ds, err;
1501         void *dig_in = mdev->tconn->int_dig_in;
1502         void *dig_vv = mdev->tconn->int_dig_vv;
1503         unsigned long *data;
1504
1505         dgs = 0;
1506         if (mdev->tconn->peer_integrity_tfm) {
1507                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1508                 /*
1509                  * FIXME: Receive the incoming digest into the receive buffer
1510                  *        here, together with its struct p_data?
1511                  */
1512                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1513                 if (err)
1514                         return NULL;
1515                 data_size -= dgs;
1516         }
1517
1518         if (!expect(IS_ALIGNED(data_size, 512)))
1519                 return NULL;
1520         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1521                 return NULL;
1522
1523         /* even though we trust out peer,
1524          * we sometimes have to double check. */
1525         if (sector + (data_size>>9) > capacity) {
1526                 dev_err(DEV, "request from peer beyond end of local disk: "
1527                         "capacity: %llus < sector: %llus + size: %u\n",
1528                         (unsigned long long)capacity,
1529                         (unsigned long long)sector, data_size);
1530                 return NULL;
1531         }
1532
1533         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1534          * "criss-cross" setup, that might cause write-out on some other DRBD,
1535          * which in turn might block on the other node at this very place.  */
1536         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1537         if (!peer_req)
1538                 return NULL;
1539
1540         if (!data_size)
1541                 return peer_req;
1542
1543         ds = data_size;
1544         page = peer_req->pages;
1545         page_chain_for_each(page) {
1546                 unsigned len = min_t(int, ds, PAGE_SIZE);
1547                 data = kmap(page);
1548                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1549                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1550                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1551                         data[0] = data[0] ^ (unsigned long)-1;
1552                 }
1553                 kunmap(page);
1554                 if (err) {
1555                         drbd_free_peer_req(mdev, peer_req);
1556                         return NULL;
1557                 }
1558                 ds -= len;
1559         }
1560
1561         if (dgs) {
1562                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1563                 if (memcmp(dig_in, dig_vv, dgs)) {
1564                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1565                                 (unsigned long long)sector, data_size);
1566                         drbd_free_peer_req(mdev, peer_req);
1567                         return NULL;
1568                 }
1569         }
1570         mdev->recv_cnt += data_size>>9;
1571         return peer_req;
1572 }
1573
1574 /* drbd_drain_block() just takes a data block
1575  * out of the socket input buffer, and discards it.
1576  */
1577 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1578 {
1579         struct page *page;
1580         int err = 0;
1581         void *data;
1582
1583         if (!data_size)
1584                 return 0;
1585
1586         page = drbd_alloc_pages(mdev, 1, 1);
1587
1588         data = kmap(page);
1589         while (data_size) {
1590                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1591
1592                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1593                 if (err)
1594                         break;
1595                 data_size -= len;
1596         }
1597         kunmap(page);
1598         drbd_free_pages(mdev, page, 0);
1599         return err;
1600 }
1601
1602 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1603                            sector_t sector, int data_size)
1604 {
1605         struct bio_vec *bvec;
1606         struct bio *bio;
1607         int dgs, err, i, expect;
1608         void *dig_in = mdev->tconn->int_dig_in;
1609         void *dig_vv = mdev->tconn->int_dig_vv;
1610
1611         dgs = 0;
1612         if (mdev->tconn->peer_integrity_tfm) {
1613                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1614                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1615                 if (err)
1616                         return err;
1617                 data_size -= dgs;
1618         }
1619
1620         /* optimistically update recv_cnt.  if receiving fails below,
1621          * we disconnect anyways, and counters will be reset. */
1622         mdev->recv_cnt += data_size>>9;
1623
1624         bio = req->master_bio;
1625         D_ASSERT(sector == bio->bi_sector);
1626
1627         bio_for_each_segment(bvec, bio, i) {
1628                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1629                 expect = min_t(int, data_size, bvec->bv_len);
1630                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1631                 kunmap(bvec->bv_page);
1632                 if (err)
1633                         return err;
1634                 data_size -= expect;
1635         }
1636
1637         if (dgs) {
1638                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1639                 if (memcmp(dig_in, dig_vv, dgs)) {
1640                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1641                         return -EINVAL;
1642                 }
1643         }
1644
1645         D_ASSERT(data_size == 0);
1646         return 0;
1647 }
1648
1649 /*
1650  * e_end_resync_block() is called in asender context via
1651  * drbd_finish_peer_reqs().
1652  */
1653 static int e_end_resync_block(struct drbd_work *w, int unused)
1654 {
1655         struct drbd_peer_request *peer_req =
1656                 container_of(w, struct drbd_peer_request, w);
1657         struct drbd_conf *mdev = w->mdev;
1658         sector_t sector = peer_req->i.sector;
1659         int err;
1660
1661         D_ASSERT(drbd_interval_empty(&peer_req->i));
1662
1663         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1664                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1665                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1666         } else {
1667                 /* Record failure to sync */
1668                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1669
1670                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1671         }
1672         dec_unacked(mdev);
1673
1674         return err;
1675 }
1676
1677 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1678 {
1679         struct drbd_peer_request *peer_req;
1680
1681         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1682         if (!peer_req)
1683                 goto fail;
1684
1685         dec_rs_pending(mdev);
1686
1687         inc_unacked(mdev);
1688         /* corresponding dec_unacked() in e_end_resync_block()
1689          * respective _drbd_clear_done_ee */
1690
1691         peer_req->w.cb = e_end_resync_block;
1692
1693         spin_lock_irq(&mdev->tconn->req_lock);
1694         list_add(&peer_req->w.list, &mdev->sync_ee);
1695         spin_unlock_irq(&mdev->tconn->req_lock);
1696
1697         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1698         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1699                 return 0;
1700
1701         /* don't care for the reason here */
1702         dev_err(DEV, "submit failed, triggering re-connect\n");
1703         spin_lock_irq(&mdev->tconn->req_lock);
1704         list_del(&peer_req->w.list);
1705         spin_unlock_irq(&mdev->tconn->req_lock);
1706
1707         drbd_free_peer_req(mdev, peer_req);
1708 fail:
1709         put_ldev(mdev);
1710         return -EIO;
1711 }
1712
1713 static struct drbd_request *
1714 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1715              sector_t sector, bool missing_ok, const char *func)
1716 {
1717         struct drbd_request *req;
1718
1719         /* Request object according to our peer */
1720         req = (struct drbd_request *)(unsigned long)id;
1721         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1722                 return req;
1723         if (!missing_ok) {
1724                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1725                         (unsigned long)id, (unsigned long long)sector);
1726         }
1727         return NULL;
1728 }
1729
1730 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1731 {
1732         struct drbd_conf *mdev;
1733         struct drbd_request *req;
1734         sector_t sector;
1735         int err;
1736         struct p_data *p = pi->data;
1737
1738         mdev = vnr_to_mdev(tconn, pi->vnr);
1739         if (!mdev)
1740                 return -EIO;
1741
1742         sector = be64_to_cpu(p->sector);
1743
1744         spin_lock_irq(&mdev->tconn->req_lock);
1745         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1746         spin_unlock_irq(&mdev->tconn->req_lock);
1747         if (unlikely(!req))
1748                 return -EIO;
1749
1750         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1751          * special casing it there for the various failure cases.
1752          * still no race with drbd_fail_pending_reads */
1753         err = recv_dless_read(mdev, req, sector, pi->size);
1754         if (!err)
1755                 req_mod(req, DATA_RECEIVED);
1756         /* else: nothing. handled from drbd_disconnect...
1757          * I don't think we may complete this just yet
1758          * in case we are "on-disconnect: freeze" */
1759
1760         return err;
1761 }
1762
1763 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1764 {
1765         struct drbd_conf *mdev;
1766         sector_t sector;
1767         int err;
1768         struct p_data *p = pi->data;
1769
1770         mdev = vnr_to_mdev(tconn, pi->vnr);
1771         if (!mdev)
1772                 return -EIO;
1773
1774         sector = be64_to_cpu(p->sector);
1775         D_ASSERT(p->block_id == ID_SYNCER);
1776
1777         if (get_ldev(mdev)) {
1778                 /* data is submitted to disk within recv_resync_read.
1779                  * corresponding put_ldev done below on error,
1780                  * or in drbd_peer_request_endio. */
1781                 err = recv_resync_read(mdev, sector, pi->size);
1782         } else {
1783                 if (__ratelimit(&drbd_ratelimit_state))
1784                         dev_err(DEV, "Can not write resync data to local disk.\n");
1785
1786                 err = drbd_drain_block(mdev, pi->size);
1787
1788                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1789         }
1790
1791         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1792
1793         return err;
1794 }
1795
1796 static void restart_conflicting_writes(struct drbd_conf *mdev,
1797                                        sector_t sector, int size)
1798 {
1799         struct drbd_interval *i;
1800         struct drbd_request *req;
1801
1802         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1803                 if (!i->local)
1804                         continue;
1805                 req = container_of(i, struct drbd_request, i);
1806                 if (req->rq_state & RQ_LOCAL_PENDING ||
1807                     !(req->rq_state & RQ_POSTPONED))
1808                         continue;
1809                 /* as it is RQ_POSTPONED, this will cause it to
1810                  * be queued on the retry workqueue. */
1811                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1812         }
1813 }
1814
1815 /*
1816  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1817  */
1818 static int e_end_block(struct drbd_work *w, int cancel)
1819 {
1820         struct drbd_peer_request *peer_req =
1821                 container_of(w, struct drbd_peer_request, w);
1822         struct drbd_conf *mdev = w->mdev;
1823         sector_t sector = peer_req->i.sector;
1824         int err = 0, pcmd;
1825
1826         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1827                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1828                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1829                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1830                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1831                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1832                         err = drbd_send_ack(mdev, pcmd, peer_req);
1833                         if (pcmd == P_RS_WRITE_ACK)
1834                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1835                 } else {
1836                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1837                         /* we expect it to be marked out of sync anyways...
1838                          * maybe assert this?  */
1839                 }
1840                 dec_unacked(mdev);
1841         }
1842         /* we delete from the conflict detection hash _after_ we sent out the
1843          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1844         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1845                 spin_lock_irq(&mdev->tconn->req_lock);
1846                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1847                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1848                 if (peer_req->flags & EE_RESTART_REQUESTS)
1849                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1850                 spin_unlock_irq(&mdev->tconn->req_lock);
1851         } else
1852                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1853
1854         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1855
1856         return err;
1857 }
1858
1859 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1860 {
1861         struct drbd_conf *mdev = w->mdev;
1862         struct drbd_peer_request *peer_req =
1863                 container_of(w, struct drbd_peer_request, w);
1864         int err;
1865
1866         err = drbd_send_ack(mdev, ack, peer_req);
1867         dec_unacked(mdev);
1868
1869         return err;
1870 }
1871
1872 static int e_send_superseded(struct drbd_work *w, int unused)
1873 {
1874         return e_send_ack(w, P_SUPERSEDED);
1875 }
1876
1877 static int e_send_retry_write(struct drbd_work *w, int unused)
1878 {
1879         struct drbd_tconn *tconn = w->mdev->tconn;
1880
1881         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1882                              P_RETRY_WRITE : P_SUPERSEDED);
1883 }
1884
1885 static bool seq_greater(u32 a, u32 b)
1886 {
1887         /*
1888          * We assume 32-bit wrap-around here.
1889          * For 24-bit wrap-around, we would have to shift:
1890          *  a <<= 8; b <<= 8;
1891          */
1892         return (s32)a - (s32)b > 0;
1893 }
1894
1895 static u32 seq_max(u32 a, u32 b)
1896 {
1897         return seq_greater(a, b) ? a : b;
1898 }
1899
1900 static bool need_peer_seq(struct drbd_conf *mdev)
1901 {
1902         struct drbd_tconn *tconn = mdev->tconn;
1903         int tp;
1904
1905         /*
1906          * We only need to keep track of the last packet_seq number of our peer
1907          * if we are in dual-primary mode and we have the resolve-conflicts flag set; see
1908          * handle_write_conflicts().
1909          */
1910
1911         rcu_read_lock();
1912         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1913         rcu_read_unlock();
1914
1915         return tp && test_bit(RESOLVE_CONFLICTS, &tconn->flags);
1916 }
1917
1918 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1919 {
1920         unsigned int newest_peer_seq;
1921
1922         if (need_peer_seq(mdev)) {
1923                 spin_lock(&mdev->peer_seq_lock);
1924                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1925                 mdev->peer_seq = newest_peer_seq;
1926                 spin_unlock(&mdev->peer_seq_lock);
1927                 /* wake up only if we actually changed mdev->peer_seq */
1928                 if (peer_seq == newest_peer_seq)
1929                         wake_up(&mdev->seq_wait);
1930         }
1931 }
1932
1933 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1934 {
1935         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1936 }
1937
1938 /* maybe change sync_ee into interval trees as well? */
1939 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1940 {
1941         struct drbd_peer_request *rs_req;
1942         bool rv = 0;
1943
1944         spin_lock_irq(&mdev->tconn->req_lock);
1945         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1946                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1947                              rs_req->i.sector, rs_req->i.size)) {
1948                         rv = 1;
1949                         break;
1950                 }
1951         }
1952         spin_unlock_irq(&mdev->tconn->req_lock);
1953
1954         return rv;
1955 }
1956
1957 /* Called from receive_Data.
1958  * Synchronize packets on sock with packets on msock.
1959  *
1960  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1961  * packet traveling on msock, they are still processed in the order they have
1962  * been sent.
1963  *
1964  * Note: we don't care for Ack packets overtaking P_DATA packets.
1965  *
1966  * In case packet_seq is larger than mdev->peer_seq number, there are
1967  * outstanding packets on the msock. We wait for them to arrive.
1968  * In case we are the logically next packet, we update mdev->peer_seq
1969  * ourselves. Correctly handles 32bit wrap around.
1970  *
1971  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1972  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1973  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1974  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1975  *
1976  * returns 0 if we may process the packet,
1977  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1978 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1979 {
1980         DEFINE_WAIT(wait);
1981         long timeout;
1982         int ret;
1983
1984         if (!need_peer_seq(mdev))
1985                 return 0;
1986
1987         spin_lock(&mdev->peer_seq_lock);
1988         for (;;) {
1989                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1990                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1991                         ret = 0;
1992                         break;
1993                 }
1994                 if (signal_pending(current)) {
1995                         ret = -ERESTARTSYS;
1996                         break;
1997                 }
1998                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1999                 spin_unlock(&mdev->peer_seq_lock);
2000                 rcu_read_lock();
2001                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
2002                 rcu_read_unlock();
2003                 timeout = schedule_timeout(timeout);
2004                 spin_lock(&mdev->peer_seq_lock);
2005                 if (!timeout) {
2006                         ret = -ETIMEDOUT;
2007                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
2008                         break;
2009                 }
2010         }
2011         spin_unlock(&mdev->peer_seq_lock);
2012         finish_wait(&mdev->seq_wait, &wait);
2013         return ret;
2014 }
2015
2016 /* see also bio_flags_to_wire()
2017  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2018  * flags and back. We may replicate to other kernel versions. */
2019 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
2020 {
2021         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2022                 (dpf & DP_FUA ? REQ_FUA : 0) |
2023                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2024                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2025 }
2026
2027 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2028                                     unsigned int size)
2029 {
2030         struct drbd_interval *i;
2031
2032     repeat:
2033         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2034                 struct drbd_request *req;
2035                 struct bio_and_error m;
2036
2037                 if (!i->local)
2038                         continue;
2039                 req = container_of(i, struct drbd_request, i);
2040                 if (!(req->rq_state & RQ_POSTPONED))
2041                         continue;
2042                 req->rq_state &= ~RQ_POSTPONED;
2043                 __req_mod(req, NEG_ACKED, &m);
2044                 spin_unlock_irq(&mdev->tconn->req_lock);
2045                 if (m.bio)
2046                         complete_master_bio(mdev, &m);
2047                 spin_lock_irq(&mdev->tconn->req_lock);
2048                 goto repeat;
2049         }
2050 }
2051
2052 static int handle_write_conflicts(struct drbd_conf *mdev,
2053                                   struct drbd_peer_request *peer_req)
2054 {
2055         struct drbd_tconn *tconn = mdev->tconn;
2056         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &tconn->flags);
2057         sector_t sector = peer_req->i.sector;
2058         const unsigned int size = peer_req->i.size;
2059         struct drbd_interval *i;
2060         bool equal;
2061         int err;
2062
2063         /*
2064          * Inserting the peer request into the write_requests tree will prevent
2065          * new conflicting local requests from being added.
2066          */
2067         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2068
2069     repeat:
2070         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2071                 if (i == &peer_req->i)
2072                         continue;
2073
2074                 if (!i->local) {
2075                         /*
2076                          * Our peer has sent a conflicting remote request; this
2077                          * should not happen in a two-node setup.  Wait for the
2078                          * earlier peer request to complete.
2079                          */
2080                         err = drbd_wait_misc(mdev, i);
2081                         if (err)
2082                                 goto out;
2083                         goto repeat;
2084                 }
2085
2086                 equal = i->sector == sector && i->size == size;
2087                 if (resolve_conflicts) {
2088                         /*
2089                          * If the peer request is fully contained within the
2090                          * overlapping request, it can be considered overwritten
2091                          * and thus superseded; otherwise, it will be retried
2092                          * once all overlapping requests have completed.
2093                          */
2094                         bool superseded = i->sector <= sector && i->sector +
2095                                        (i->size >> 9) >= sector + (size >> 9);
2096
2097                         if (!equal)
2098                                 dev_alert(DEV, "Concurrent writes detected: "
2099                                                "local=%llus +%u, remote=%llus +%u, "
2100                                                "assuming %s came first\n",
2101                                           (unsigned long long)i->sector, i->size,
2102                                           (unsigned long long)sector, size,
2103                                           superseded ? "local" : "remote");
2104
2105                         inc_unacked(mdev);
2106                         peer_req->w.cb = superseded ? e_send_superseded :
2107                                                    e_send_retry_write;
2108                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2109                         wake_asender(mdev->tconn);
2110
2111                         err = -ENOENT;
2112                         goto out;
2113                 } else {
2114                         struct drbd_request *req =
2115                                 container_of(i, struct drbd_request, i);
2116
2117                         if (!equal)
2118                                 dev_alert(DEV, "Concurrent writes detected: "
2119                                                "local=%llus +%u, remote=%llus +%u\n",
2120                                           (unsigned long long)i->sector, i->size,
2121                                           (unsigned long long)sector, size);
2122
2123                         if (req->rq_state & RQ_LOCAL_PENDING ||
2124                             !(req->rq_state & RQ_POSTPONED)) {
2125                                 /*
2126                                  * Wait for the node with the discard flag to
2127                                  * decide if this request has been superseded
2128                                  * or needs to be retried.
2129                                  * Requests that have been superseded will
2130                                  * disappear from the write_requests tree.
2131                                  *
2132                                  * In addition, wait for the conflicting
2133                                  * request to finish locally before submitting
2134                                  * the conflicting peer request.
2135                                  */
2136                                 err = drbd_wait_misc(mdev, &req->i);
2137                                 if (err) {
2138                                         _conn_request_state(mdev->tconn,
2139                                                             NS(conn, C_TIMEOUT),
2140                                                             CS_HARD);
2141                                         fail_postponed_requests(mdev, sector, size);
2142                                         goto out;
2143                                 }
2144                                 goto repeat;
2145                         }
2146                         /*
2147                          * Remember to restart the conflicting requests after
2148                          * the new peer request has completed.
2149                          */
2150                         peer_req->flags |= EE_RESTART_REQUESTS;
2151                 }
2152         }
2153         err = 0;
2154
2155     out:
2156         if (err)
2157                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2158         return err;
2159 }
2160
2161 /* mirrored write */
2162 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2163 {
2164         struct drbd_conf *mdev;
2165         sector_t sector;
2166         struct drbd_peer_request *peer_req;
2167         struct p_data *p = pi->data;
2168         u32 peer_seq = be32_to_cpu(p->seq_num);
2169         int rw = WRITE;
2170         u32 dp_flags;
2171         int err, tp;
2172
2173         mdev = vnr_to_mdev(tconn, pi->vnr);
2174         if (!mdev)
2175                 return -EIO;
2176
2177         if (!get_ldev(mdev)) {
2178                 int err2;
2179
2180                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2181                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2182                 atomic_inc(&tconn->current_epoch->epoch_size);
2183                 err2 = drbd_drain_block(mdev, pi->size);
2184                 if (!err)
2185                         err = err2;
2186                 return err;
2187         }
2188
2189         /*
2190          * Corresponding put_ldev done either below (on various errors), or in
2191          * drbd_peer_request_endio, if we successfully submit the data at the
2192          * end of this function.
2193          */
2194
2195         sector = be64_to_cpu(p->sector);
2196         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2197         if (!peer_req) {
2198                 put_ldev(mdev);
2199                 return -EIO;
2200         }
2201
2202         peer_req->w.cb = e_end_block;
2203
2204         dp_flags = be32_to_cpu(p->dp_flags);
2205         rw |= wire_flags_to_bio(mdev, dp_flags);
2206         if (peer_req->pages == NULL) {
2207                 D_ASSERT(peer_req->i.size == 0);
2208                 D_ASSERT(dp_flags & DP_FLUSH);
2209         }
2210
2211         if (dp_flags & DP_MAY_SET_IN_SYNC)
2212                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2213
2214         spin_lock(&tconn->epoch_lock);
2215         peer_req->epoch = tconn->current_epoch;
2216         atomic_inc(&peer_req->epoch->epoch_size);
2217         atomic_inc(&peer_req->epoch->active);
2218         spin_unlock(&tconn->epoch_lock);
2219
2220         rcu_read_lock();
2221         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2222         rcu_read_unlock();
2223         if (tp) {
2224                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2225                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2226                 if (err)
2227                         goto out_interrupted;
2228                 spin_lock_irq(&mdev->tconn->req_lock);
2229                 err = handle_write_conflicts(mdev, peer_req);
2230                 if (err) {
2231                         spin_unlock_irq(&mdev->tconn->req_lock);
2232                         if (err == -ENOENT) {
2233                                 put_ldev(mdev);
2234                                 return 0;
2235                         }
2236                         goto out_interrupted;
2237                 }
2238         } else
2239                 spin_lock_irq(&mdev->tconn->req_lock);
2240         list_add(&peer_req->w.list, &mdev->active_ee);
2241         spin_unlock_irq(&mdev->tconn->req_lock);
2242
2243         if (mdev->state.conn == C_SYNC_TARGET)
2244                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2245
2246         if (mdev->tconn->agreed_pro_version < 100) {
2247                 rcu_read_lock();
2248                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2249                 case DRBD_PROT_C:
2250                         dp_flags |= DP_SEND_WRITE_ACK;
2251                         break;
2252                 case DRBD_PROT_B:
2253                         dp_flags |= DP_SEND_RECEIVE_ACK;
2254                         break;
2255                 }
2256                 rcu_read_unlock();
2257         }
2258
2259         if (dp_flags & DP_SEND_WRITE_ACK) {
2260                 peer_req->flags |= EE_SEND_WRITE_ACK;
2261                 inc_unacked(mdev);
2262                 /* corresponding dec_unacked() in e_end_block()
2263                  * respective _drbd_clear_done_ee */
2264         }
2265
2266         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2267                 /* I really don't like it that the receiver thread
2268                  * sends on the msock, but anyways */
2269                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2270         }
2271
2272         if (mdev->state.pdsk < D_INCONSISTENT) {
2273                 /* In case we have the only disk of the cluster, */
2274                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2275                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2276                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2277                 drbd_al_begin_io(mdev, &peer_req->i);
2278         }
2279
2280         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2281         if (!err)
2282                 return 0;
2283
2284         /* don't care for the reason here */
2285         dev_err(DEV, "submit failed, triggering re-connect\n");
2286         spin_lock_irq(&mdev->tconn->req_lock);
2287         list_del(&peer_req->w.list);
2288         drbd_remove_epoch_entry_interval(mdev, peer_req);
2289         spin_unlock_irq(&mdev->tconn->req_lock);
2290         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2291                 drbd_al_complete_io(mdev, &peer_req->i);
2292
2293 out_interrupted:
2294         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2295         put_ldev(mdev);
2296         drbd_free_peer_req(mdev, peer_req);
2297         return err;
2298 }
2299
2300 /* We may throttle resync, if the lower device seems to be busy,
2301  * and current sync rate is above c_min_rate.
2302  *
2303  * To decide whether or not the lower device is busy, we use a scheme similar
2304  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2305  * (more than 64 sectors) of activity we cannot account for with our own resync
2306  * activity, it obviously is "busy".
2307  *
2308  * The current sync rate used here uses only the most recent two step marks,
2309  * to have a short time average so we can react faster.
2310  */
2311 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2312 {
2313         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2314         unsigned long db, dt, dbdt;
2315         struct lc_element *tmp;
2316         int curr_events;
2317         int throttle = 0;
2318         unsigned int c_min_rate;
2319
2320         rcu_read_lock();
2321         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2322         rcu_read_unlock();
2323
2324         /* feature disabled? */
2325         if (c_min_rate == 0)
2326                 return 0;
2327
2328         spin_lock_irq(&mdev->al_lock);
2329         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2330         if (tmp) {
2331                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2332                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2333                         spin_unlock_irq(&mdev->al_lock);
2334                         return 0;
2335                 }
2336                 /* Do not slow down if app IO is already waiting for this extent */
2337         }
2338         spin_unlock_irq(&mdev->al_lock);
2339
2340         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2341                       (int)part_stat_read(&disk->part0, sectors[1]) -
2342                         atomic_read(&mdev->rs_sect_ev);
2343
2344         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2345                 unsigned long rs_left;
2346                 int i;
2347
2348                 mdev->rs_last_events = curr_events;
2349
2350                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2351                  * approx. */
2352                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2353
2354                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2355                         rs_left = mdev->ov_left;
2356                 else
2357                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2358
2359                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2360                 if (!dt)
2361                         dt++;
2362                 db = mdev->rs_mark_left[i] - rs_left;
2363                 dbdt = Bit2KB(db/dt);
2364
2365                 if (dbdt > c_min_rate)
2366                         throttle = 1;
2367         }
2368         return throttle;
2369 }
2370
2371
2372 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2373 {
2374         struct drbd_conf *mdev;
2375         sector_t sector;
2376         sector_t capacity;
2377         struct drbd_peer_request *peer_req;
2378         struct digest_info *di = NULL;
2379         int size, verb;
2380         unsigned int fault_type;
2381         struct p_block_req *p = pi->data;
2382
2383         mdev = vnr_to_mdev(tconn, pi->vnr);
2384         if (!mdev)
2385                 return -EIO;
2386         capacity = drbd_get_capacity(mdev->this_bdev);
2387
2388         sector = be64_to_cpu(p->sector);
2389         size   = be32_to_cpu(p->blksize);
2390
2391         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2392                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2393                                 (unsigned long long)sector, size);
2394                 return -EINVAL;
2395         }
2396         if (sector + (size>>9) > capacity) {
2397                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2398                                 (unsigned long long)sector, size);
2399                 return -EINVAL;
2400         }
2401
2402         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2403                 verb = 1;
2404                 switch (pi->cmd) {
2405                 case P_DATA_REQUEST:
2406                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2407                         break;
2408                 case P_RS_DATA_REQUEST:
2409                 case P_CSUM_RS_REQUEST:
2410                 case P_OV_REQUEST:
2411                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2412                         break;
2413                 case P_OV_REPLY:
2414                         verb = 0;
2415                         dec_rs_pending(mdev);
2416                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2417                         break;
2418                 default:
2419                         BUG();
2420                 }
2421                 if (verb && __ratelimit(&drbd_ratelimit_state))
2422                         dev_err(DEV, "Can not satisfy peer's read request, "
2423                             "no local data.\n");
2424
2425                 /* drain possibly payload */
2426                 return drbd_drain_block(mdev, pi->size);
2427         }
2428
2429         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2430          * "criss-cross" setup, that might cause write-out on some other DRBD,
2431          * which in turn might block on the other node at this very place.  */
2432         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2433         if (!peer_req) {
2434                 put_ldev(mdev);
2435                 return -ENOMEM;
2436         }
2437
2438         switch (pi->cmd) {
2439         case P_DATA_REQUEST:
2440                 peer_req->w.cb = w_e_end_data_req;
2441                 fault_type = DRBD_FAULT_DT_RD;
2442                 /* application IO, don't drbd_rs_begin_io */
2443                 goto submit;
2444
2445         case P_RS_DATA_REQUEST:
2446                 peer_req->w.cb = w_e_end_rsdata_req;
2447                 fault_type = DRBD_FAULT_RS_RD;
2448                 /* used in the sector offset progress display */
2449                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2450                 break;
2451
2452         case P_OV_REPLY:
2453         case P_CSUM_RS_REQUEST:
2454                 fault_type = DRBD_FAULT_RS_RD;
2455                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2456                 if (!di)
2457                         goto out_free_e;
2458
2459                 di->digest_size = pi->size;
2460                 di->digest = (((char *)di)+sizeof(struct digest_info));
2461
2462                 peer_req->digest = di;
2463                 peer_req->flags |= EE_HAS_DIGEST;
2464
2465                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2466                         goto out_free_e;
2467
2468                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2469                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2470                         peer_req->w.cb = w_e_end_csum_rs_req;
2471                         /* used in the sector offset progress display */
2472                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2473                 } else if (pi->cmd == P_OV_REPLY) {
2474                         /* track progress, we may need to throttle */
2475                         atomic_add(size >> 9, &mdev->rs_sect_in);
2476                         peer_req->w.cb = w_e_end_ov_reply;
2477                         dec_rs_pending(mdev);
2478                         /* drbd_rs_begin_io done when we sent this request,
2479                          * but accounting still needs to be done. */
2480                         goto submit_for_resync;
2481                 }
2482                 break;
2483
2484         case P_OV_REQUEST:
2485                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2486                     mdev->tconn->agreed_pro_version >= 90) {
2487                         unsigned long now = jiffies;
2488                         int i;
2489                         mdev->ov_start_sector = sector;
2490                         mdev->ov_position = sector;
2491                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2492                         mdev->rs_total = mdev->ov_left;
2493                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2494                                 mdev->rs_mark_left[i] = mdev->ov_left;
2495                                 mdev->rs_mark_time[i] = now;
2496                         }
2497                         dev_info(DEV, "Online Verify start sector: %llu\n",
2498                                         (unsigned long long)sector);
2499                 }
2500                 peer_req->w.cb = w_e_end_ov_req;
2501                 fault_type = DRBD_FAULT_RS_RD;
2502                 break;
2503
2504         default:
2505                 BUG();
2506         }
2507
2508         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2509          * wrt the receiver, but it is not as straightforward as it may seem.
2510          * Various places in the resync start and stop logic assume resync
2511          * requests are processed in order, requeuing this on the worker thread
2512          * introduces a bunch of new code for synchronization between threads.
2513          *
2514          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2515          * "forever", throttling after drbd_rs_begin_io will lock that extent
2516          * for application writes for the same time.  For now, just throttle
2517          * here, where the rest of the code expects the receiver to sleep for
2518          * a while, anyways.
2519          */
2520
2521         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2522          * this defers syncer requests for some time, before letting at least
2523          * on request through.  The resync controller on the receiving side
2524          * will adapt to the incoming rate accordingly.
2525          *
2526          * We cannot throttle here if remote is Primary/SyncTarget:
2527          * we would also throttle its application reads.
2528          * In that case, throttling is done on the SyncTarget only.
2529          */
2530         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2531                 schedule_timeout_uninterruptible(HZ/10);
2532         if (drbd_rs_begin_io(mdev, sector))
2533                 goto out_free_e;
2534
2535 submit_for_resync:
2536         atomic_add(size >> 9, &mdev->rs_sect_ev);
2537
2538 submit:
2539         inc_unacked(mdev);
2540         spin_lock_irq(&mdev->tconn->req_lock);
2541         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2542         spin_unlock_irq(&mdev->tconn->req_lock);
2543
2544         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2545                 return 0;
2546
2547         /* don't care for the reason here */
2548         dev_err(DEV, "submit failed, triggering re-connect\n");
2549         spin_lock_irq(&mdev->tconn->req_lock);
2550         list_del(&peer_req->w.list);
2551         spin_unlock_irq(&mdev->tconn->req_lock);
2552         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2553
2554 out_free_e:
2555         put_ldev(mdev);
2556         drbd_free_peer_req(mdev, peer_req);
2557         return -EIO;
2558 }
2559
2560 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2561 {
2562         int self, peer, rv = -100;
2563         unsigned long ch_self, ch_peer;
2564         enum drbd_after_sb_p after_sb_0p;
2565
2566         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2567         peer = mdev->p_uuid[UI_BITMAP] & 1;
2568
2569         ch_peer = mdev->p_uuid[UI_SIZE];
2570         ch_self = mdev->comm_bm_set;
2571
2572         rcu_read_lock();
2573         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2574         rcu_read_unlock();
2575         switch (after_sb_0p) {
2576         case ASB_CONSENSUS:
2577         case ASB_DISCARD_SECONDARY:
2578         case ASB_CALL_HELPER:
2579         case ASB_VIOLENTLY:
2580                 dev_err(DEV, "Configuration error.\n");
2581                 break;
2582         case ASB_DISCONNECT:
2583                 break;
2584         case ASB_DISCARD_YOUNGER_PRI:
2585                 if (self == 0 && peer == 1) {
2586                         rv = -1;
2587                         break;
2588                 }
2589                 if (self == 1 && peer == 0) {
2590                         rv =  1;
2591                         break;
2592                 }
2593                 /* Else fall through to one of the other strategies... */
2594         case ASB_DISCARD_OLDER_PRI:
2595                 if (self == 0 && peer == 1) {
2596                         rv = 1;
2597                         break;
2598                 }
2599                 if (self == 1 && peer == 0) {
2600                         rv = -1;
2601                         break;
2602                 }
2603                 /* Else fall through to one of the other strategies... */
2604                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2605                      "Using discard-least-changes instead\n");
2606         case ASB_DISCARD_ZERO_CHG:
2607                 if (ch_peer == 0 && ch_self == 0) {
2608                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2609                                 ? -1 : 1;
2610                         break;
2611                 } else {
2612                         if (ch_peer == 0) { rv =  1; break; }
2613                         if (ch_self == 0) { rv = -1; break; }
2614                 }
2615                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2616                         break;
2617         case ASB_DISCARD_LEAST_CHG:
2618                 if      (ch_self < ch_peer)
2619                         rv = -1;
2620                 else if (ch_self > ch_peer)
2621                         rv =  1;
2622                 else /* ( ch_self == ch_peer ) */
2623                      /* Well, then use something else. */
2624                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2625                                 ? -1 : 1;
2626                 break;
2627         case ASB_DISCARD_LOCAL:
2628                 rv = -1;
2629                 break;
2630         case ASB_DISCARD_REMOTE:
2631                 rv =  1;
2632         }
2633
2634         return rv;
2635 }
2636
2637 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2638 {
2639         int hg, rv = -100;
2640         enum drbd_after_sb_p after_sb_1p;
2641
2642         rcu_read_lock();
2643         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2644         rcu_read_unlock();
2645         switch (after_sb_1p) {
2646         case ASB_DISCARD_YOUNGER_PRI:
2647         case ASB_DISCARD_OLDER_PRI:
2648         case ASB_DISCARD_LEAST_CHG:
2649         case ASB_DISCARD_LOCAL:
2650         case ASB_DISCARD_REMOTE:
2651         case ASB_DISCARD_ZERO_CHG:
2652                 dev_err(DEV, "Configuration error.\n");
2653                 break;
2654         case ASB_DISCONNECT:
2655                 break;
2656         case ASB_CONSENSUS:
2657                 hg = drbd_asb_recover_0p(mdev);
2658                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2659                         rv = hg;
2660                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2661                         rv = hg;
2662                 break;
2663         case ASB_VIOLENTLY:
2664                 rv = drbd_asb_recover_0p(mdev);
2665                 break;
2666         case ASB_DISCARD_SECONDARY:
2667                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2668         case ASB_CALL_HELPER:
2669                 hg = drbd_asb_recover_0p(mdev);
2670                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2671                         enum drbd_state_rv rv2;
2672
2673                         drbd_set_role(mdev, R_SECONDARY, 0);
2674                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2675                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2676                           * we do not need to wait for the after state change work either. */
2677                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2678                         if (rv2 != SS_SUCCESS) {
2679                                 drbd_khelper(mdev, "pri-lost-after-sb");
2680                         } else {
2681                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2682                                 rv = hg;
2683                         }
2684                 } else
2685                         rv = hg;
2686         }
2687
2688         return rv;
2689 }
2690
2691 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2692 {
2693         int hg, rv = -100;
2694         enum drbd_after_sb_p after_sb_2p;
2695
2696         rcu_read_lock();
2697         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2698         rcu_read_unlock();
2699         switch (after_sb_2p) {
2700         case ASB_DISCARD_YOUNGER_PRI:
2701         case ASB_DISCARD_OLDER_PRI:
2702         case ASB_DISCARD_LEAST_CHG:
2703         case ASB_DISCARD_LOCAL:
2704         case ASB_DISCARD_REMOTE:
2705         case ASB_CONSENSUS:
2706         case ASB_DISCARD_SECONDARY:
2707         case ASB_DISCARD_ZERO_CHG:
2708                 dev_err(DEV, "Configuration error.\n");
2709                 break;
2710         case ASB_VIOLENTLY:
2711                 rv = drbd_asb_recover_0p(mdev);
2712                 break;
2713         case ASB_DISCONNECT:
2714                 break;
2715         case ASB_CALL_HELPER:
2716                 hg = drbd_asb_recover_0p(mdev);
2717                 if (hg == -1) {
2718                         enum drbd_state_rv rv2;
2719
2720                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2721                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2722                           * we do not need to wait for the after state change work either. */
2723                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2724                         if (rv2 != SS_SUCCESS) {
2725                                 drbd_khelper(mdev, "pri-lost-after-sb");
2726                         } else {
2727                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2728                                 rv = hg;
2729                         }
2730                 } else
2731                         rv = hg;
2732         }
2733
2734         return rv;
2735 }
2736
2737 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2738                            u64 bits, u64 flags)
2739 {
2740         if (!uuid) {
2741                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2742                 return;
2743         }
2744         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2745              text,
2746              (unsigned long long)uuid[UI_CURRENT],
2747              (unsigned long long)uuid[UI_BITMAP],
2748              (unsigned long long)uuid[UI_HISTORY_START],
2749              (unsigned long long)uuid[UI_HISTORY_END],
2750              (unsigned long long)bits,
2751              (unsigned long long)flags);
2752 }
2753
2754 /*
2755   100   after split brain try auto recover
2756     2   C_SYNC_SOURCE set BitMap
2757     1   C_SYNC_SOURCE use BitMap
2758     0   no Sync
2759    -1   C_SYNC_TARGET use BitMap
2760    -2   C_SYNC_TARGET set BitMap
2761  -100   after split brain, disconnect
2762 -1000   unrelated data
2763 -1091   requires proto 91
2764 -1096   requires proto 96
2765  */
2766 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2767 {
2768         u64 self, peer;
2769         int i, j;
2770
2771         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2772         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2773
2774         *rule_nr = 10;
2775         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2776                 return 0;
2777
2778         *rule_nr = 20;
2779         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2780              peer != UUID_JUST_CREATED)
2781                 return -2;
2782
2783         *rule_nr = 30;
2784         if (self != UUID_JUST_CREATED &&
2785             (peer == UUID_JUST_CREATED || peer == (u64)0))
2786                 return 2;
2787
2788         if (self == peer) {
2789                 int rct, dc; /* roles at crash time */
2790
2791                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2792
2793                         if (mdev->tconn->agreed_pro_version < 91)
2794                                 return -1091;
2795
2796                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2797                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2798                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2799                                 drbd_uuid_set_bm(mdev, 0UL);
2800
2801                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2802                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2803                                 *rule_nr = 34;
2804                         } else {
2805                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2806                                 *rule_nr = 36;
2807                         }
2808
2809                         return 1;
2810                 }
2811
2812                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2813
2814                         if (mdev->tconn->agreed_pro_version < 91)
2815                                 return -1091;
2816
2817                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2818                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2819                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2820
2821                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2822                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2823                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2824
2825                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2826                                 *rule_nr = 35;
2827                         } else {
2828                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2829                                 *rule_nr = 37;
2830                         }
2831
2832                         return -1;
2833                 }
2834
2835                 /* Common power [off|failure] */
2836                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2837                         (mdev->p_uuid[UI_FLAGS] & 2);
2838                 /* lowest bit is set when we were primary,
2839                  * next bit (weight 2) is set when peer was primary */
2840                 *rule_nr = 40;
2841
2842                 switch (rct) {
2843                 case 0: /* !self_pri && !peer_pri */ return 0;
2844                 case 1: /*  self_pri && !peer_pri */ return 1;
2845                 case 2: /* !self_pri &&  peer_pri */ return -1;
2846                 case 3: /*  self_pri &&  peer_pri */
2847                         dc = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags);
2848                         return dc ? -1 : 1;
2849                 }
2850         }
2851
2852         *rule_nr = 50;
2853         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2854         if (self == peer)
2855                 return -1;
2856
2857         *rule_nr = 51;
2858         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2859         if (self == peer) {
2860                 if (mdev->tconn->agreed_pro_version < 96 ?
2861                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2862                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2863                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2864                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2865                            resync as sync source modifications of the peer's UUIDs. */
2866
2867                         if (mdev->tconn->agreed_pro_version < 91)
2868                                 return -1091;
2869
2870                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2871                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2872
2873                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2874                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2875
2876                         return -1;
2877                 }
2878         }
2879
2880         *rule_nr = 60;
2881         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2882         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2883                 peer = mdev->p_uuid[i] & ~((u64)1);
2884                 if (self == peer)
2885                         return -2;
2886         }
2887
2888         *rule_nr = 70;
2889         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2890         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2891         if (self == peer)
2892                 return 1;
2893
2894         *rule_nr = 71;
2895         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2896         if (self == peer) {
2897                 if (mdev->tconn->agreed_pro_version < 96 ?
2898                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2899                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2900                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2901                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2902                            resync as sync source modifications of our UUIDs. */
2903
2904                         if (mdev->tconn->agreed_pro_version < 91)
2905                                 return -1091;
2906
2907                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2908                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2909
2910                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2911                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2912                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2913
2914                         return 1;
2915                 }
2916         }
2917
2918
2919         *rule_nr = 80;
2920         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2921         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2922                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2923                 if (self == peer)
2924                         return 2;
2925         }
2926
2927         *rule_nr = 90;
2928         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2929         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2930         if (self == peer && self != ((u64)0))
2931                 return 100;
2932
2933         *rule_nr = 100;
2934         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2935                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2936                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2937                         peer = mdev->p_uuid[j] & ~((u64)1);
2938                         if (self == peer)
2939                                 return -100;
2940                 }
2941         }
2942
2943         return -1000;
2944 }
2945
2946 /* drbd_sync_handshake() returns the new conn state on success, or
2947    CONN_MASK (-1) on failure.
2948  */
2949 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2950                                            enum drbd_disk_state peer_disk) __must_hold(local)
2951 {
2952         enum drbd_conns rv = C_MASK;
2953         enum drbd_disk_state mydisk;
2954         struct net_conf *nc;
2955         int hg, rule_nr, rr_conflict, tentative;
2956
2957         mydisk = mdev->state.disk;
2958         if (mydisk == D_NEGOTIATING)
2959                 mydisk = mdev->new_state_tmp.disk;
2960
2961         dev_info(DEV, "drbd_sync_handshake:\n");
2962         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2963         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2964                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2965
2966         hg = drbd_uuid_compare(mdev, &rule_nr);
2967
2968         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2969
2970         if (hg == -1000) {
2971                 dev_alert(DEV, "Unrelated data, aborting!\n");
2972                 return C_MASK;
2973         }
2974         if (hg < -1000) {
2975                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2976                 return C_MASK;
2977         }
2978
2979         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2980             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2981                 int f = (hg == -100) || abs(hg) == 2;
2982                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2983                 if (f)
2984                         hg = hg*2;
2985                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2986                      hg > 0 ? "source" : "target");
2987         }
2988
2989         if (abs(hg) == 100)
2990                 drbd_khelper(mdev, "initial-split-brain");
2991
2992         rcu_read_lock();
2993         nc = rcu_dereference(mdev->tconn->net_conf);
2994
2995         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2996                 int pcount = (mdev->state.role == R_PRIMARY)
2997                            + (peer_role == R_PRIMARY);
2998                 int forced = (hg == -100);
2999
3000                 switch (pcount) {
3001                 case 0:
3002                         hg = drbd_asb_recover_0p(mdev);
3003                         break;
3004                 case 1:
3005                         hg = drbd_asb_recover_1p(mdev);
3006                         break;
3007                 case 2:
3008                         hg = drbd_asb_recover_2p(mdev);
3009                         break;
3010                 }
3011                 if (abs(hg) < 100) {
3012                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
3013                              "automatically solved. Sync from %s node\n",
3014                              pcount, (hg < 0) ? "peer" : "this");
3015                         if (forced) {
3016                                 dev_warn(DEV, "Doing a full sync, since"
3017                                      " UUIDs where ambiguous.\n");
3018                                 hg = hg*2;
3019                         }
3020                 }
3021         }
3022
3023         if (hg == -100) {
3024                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3025                         hg = -1;
3026                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3027                         hg = 1;
3028
3029                 if (abs(hg) < 100)
3030                         dev_warn(DEV, "Split-Brain detected, manually solved. "
3031                              "Sync from %s node\n",
3032                              (hg < 0) ? "peer" : "this");
3033         }
3034         rr_conflict = nc->rr_conflict;
3035         tentative = nc->tentative;
3036         rcu_read_unlock();
3037
3038         if (hg == -100) {
3039                 /* FIXME this log message is not correct if we end up here
3040                  * after an attempted attach on a diskless node.
3041                  * We just refuse to attach -- well, we drop the "connection"
3042                  * to that disk, in a way... */
3043                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3044                 drbd_khelper(mdev, "split-brain");
3045                 return C_MASK;
3046         }
3047
3048         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3049                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3050                 return C_MASK;
3051         }
3052
3053         if (hg < 0 && /* by intention we do not use mydisk here. */
3054             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3055                 switch (rr_conflict) {
3056                 case ASB_CALL_HELPER:
3057                         drbd_khelper(mdev, "pri-lost");
3058                         /* fall through */
3059                 case ASB_DISCONNECT:
3060                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3061                         return C_MASK;
3062                 case ASB_VIOLENTLY:
3063                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3064                              "assumption\n");
3065                 }
3066         }
3067
3068         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3069                 if (hg == 0)
3070                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3071                 else
3072                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3073                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3074                                  abs(hg) >= 2 ? "full" : "bit-map based");
3075                 return C_MASK;
3076         }
3077
3078         if (abs(hg) >= 2) {
3079                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3080                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3081                                         BM_LOCKED_SET_ALLOWED))
3082                         return C_MASK;
3083         }
3084
3085         if (hg > 0) { /* become sync source. */
3086                 rv = C_WF_BITMAP_S;
3087         } else if (hg < 0) { /* become sync target */
3088                 rv = C_WF_BITMAP_T;
3089         } else {
3090                 rv = C_CONNECTED;
3091                 if (drbd_bm_total_weight(mdev)) {
3092                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3093                              drbd_bm_total_weight(mdev));
3094                 }
3095         }
3096
3097         return rv;
3098 }
3099
3100 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3101 {
3102         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3103         if (peer == ASB_DISCARD_REMOTE)
3104                 return ASB_DISCARD_LOCAL;
3105
3106         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3107         if (peer == ASB_DISCARD_LOCAL)
3108                 return ASB_DISCARD_REMOTE;
3109
3110         /* everything else is valid if they are equal on both sides. */
3111         return peer;
3112 }
3113
3114 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3115 {
3116         struct p_protocol *p = pi->data;
3117         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3118         int p_proto, p_discard_my_data, p_two_primaries, cf;
3119         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3120         char integrity_alg[SHARED_SECRET_MAX] = "";
3121         struct crypto_hash *peer_integrity_tfm = NULL;
3122         void *int_dig_in = NULL, *int_dig_vv = NULL;
3123
3124         p_proto         = be32_to_cpu(p->protocol);
3125         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3126         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3127         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3128         p_two_primaries = be32_to_cpu(p->two_primaries);
3129         cf              = be32_to_cpu(p->conn_flags);
3130         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3131
3132         if (tconn->agreed_pro_version >= 87) {
3133                 int err;
3134
3135                 if (pi->size > sizeof(integrity_alg))
3136                         return -EIO;
3137                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3138                 if (err)
3139                         return err;
3140                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3141         }
3142
3143         if (pi->cmd != P_PROTOCOL_UPDATE) {
3144                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3145
3146                 if (cf & CF_DRY_RUN)
3147                         set_bit(CONN_DRY_RUN, &tconn->flags);
3148
3149                 rcu_read_lock();
3150                 nc = rcu_dereference(tconn->net_conf);
3151
3152                 if (p_proto != nc->wire_protocol) {
3153                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3154                         goto disconnect_rcu_unlock;
3155                 }
3156
3157                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3158                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3159                         goto disconnect_rcu_unlock;
3160                 }
3161
3162                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3163                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3164                         goto disconnect_rcu_unlock;
3165                 }
3166
3167                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3168                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3169                         goto disconnect_rcu_unlock;
3170                 }
3171
3172                 if (p_discard_my_data && nc->discard_my_data) {
3173                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3174                         goto disconnect_rcu_unlock;
3175                 }
3176
3177                 if (p_two_primaries != nc->two_primaries) {
3178                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3179                         goto disconnect_rcu_unlock;
3180                 }
3181
3182                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3183                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3184                         goto disconnect_rcu_unlock;
3185                 }
3186
3187                 rcu_read_unlock();
3188         }
3189
3190         if (integrity_alg[0]) {
3191                 int hash_size;
3192
3193                 /*
3194                  * We can only change the peer data integrity algorithm
3195                  * here.  Changing our own data integrity algorithm
3196                  * requires that we send a P_PROTOCOL_UPDATE packet at
3197                  * the same time; otherwise, the peer has no way to
3198                  * tell between which packets the algorithm should
3199                  * change.
3200                  */
3201
3202                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3203                 if (!peer_integrity_tfm) {
3204                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3205                                  integrity_alg);
3206                         goto disconnect;
3207                 }
3208
3209                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3210                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3211                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3212                 if (!(int_dig_in && int_dig_vv)) {
3213                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3214                         goto disconnect;
3215                 }
3216         }
3217
3218         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3219         if (!new_net_conf) {
3220                 conn_err(tconn, "Allocation of new net_conf failed\n");
3221                 goto disconnect;
3222         }
3223
3224         mutex_lock(&tconn->data.mutex);
3225         mutex_lock(&tconn->conf_update);
3226         old_net_conf = tconn->net_conf;
3227         *new_net_conf = *old_net_conf;
3228
3229         new_net_conf->wire_protocol = p_proto;
3230         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3231         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3232         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3233         new_net_conf->two_primaries = p_two_primaries;
3234
3235         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3236         mutex_unlock(&tconn->conf_update);
3237         mutex_unlock(&tconn->data.mutex);
3238
3239         crypto_free_hash(tconn->peer_integrity_tfm);
3240         kfree(tconn->int_dig_in);
3241         kfree(tconn->int_dig_vv);
3242         tconn->peer_integrity_tfm = peer_integrity_tfm;
3243         tconn->int_dig_in = int_dig_in;
3244         tconn->int_dig_vv = int_dig_vv;
3245
3246         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3247                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3248                           integrity_alg[0] ? integrity_alg : "(none)");
3249
3250         synchronize_rcu();
3251         kfree(old_net_conf);
3252         return 0;
3253
3254 disconnect_rcu_unlock:
3255         rcu_read_unlock();
3256 disconnect:
3257         crypto_free_hash(peer_integrity_tfm);
3258         kfree(int_dig_in);
3259         kfree(int_dig_vv);
3260         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3261         return -EIO;
3262 }
3263
3264 /* helper function
3265  * input: alg name, feature name
3266  * return: NULL (alg name was "")
3267  *         ERR_PTR(error) if something goes wrong
3268  *         or the crypto hash ptr, if it worked out ok. */
3269 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3270                 const char *alg, const char *name)
3271 {
3272         struct crypto_hash *tfm;
3273
3274         if (!alg[0])
3275                 return NULL;
3276
3277         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3278         if (IS_ERR(tfm)) {
3279                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3280                         alg, name, PTR_ERR(tfm));
3281                 return tfm;
3282         }
3283         return tfm;
3284 }
3285
3286 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3287 {
3288         void *buffer = tconn->data.rbuf;
3289         int size = pi->size;
3290
3291         while (size) {
3292                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3293                 s = drbd_recv(tconn, buffer, s);
3294                 if (s <= 0) {
3295                         if (s < 0)
3296                                 return s;
3297                         break;
3298                 }
3299                 size -= s;
3300         }
3301         if (size)
3302                 return -EIO;
3303         return 0;
3304 }
3305
3306 /*
3307  * config_unknown_volume  -  device configuration command for unknown volume
3308  *
3309  * When a device is added to an existing connection, the node on which the
3310  * device is added first will send configuration commands to its peer but the
3311  * peer will not know about the device yet.  It will warn and ignore these
3312  * commands.  Once the device is added on the second node, the second node will
3313  * send the same device configuration commands, but in the other direction.
3314  *
3315  * (We can also end up here if drbd is misconfigured.)
3316  */
3317 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3318 {
3319         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3320                   cmdname(pi->cmd), pi->vnr);
3321         return ignore_remaining_packet(tconn, pi);
3322 }
3323
3324 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3325 {
3326         struct drbd_conf *mdev;
3327         struct p_rs_param_95 *p;
3328         unsigned int header_size, data_size, exp_max_sz;
3329         struct crypto_hash *verify_tfm = NULL;
3330         struct crypto_hash *csums_tfm = NULL;
3331         struct net_conf *old_net_conf, *new_net_conf = NULL;
3332         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3333         const int apv = tconn->agreed_pro_version;
3334         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3335         int fifo_size = 0;
3336         int err;
3337
3338         mdev = vnr_to_mdev(tconn, pi->vnr);
3339         if (!mdev)
3340                 return config_unknown_volume(tconn, pi);
3341
3342         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3343                     : apv == 88 ? sizeof(struct p_rs_param)
3344                                         + SHARED_SECRET_MAX
3345                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3346                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3347
3348         if (pi->size > exp_max_sz) {
3349                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3350                     pi->size, exp_max_sz);
3351                 return -EIO;
3352         }
3353
3354         if (apv <= 88) {
3355                 header_size = sizeof(struct p_rs_param);
3356                 data_size = pi->size - header_size;
3357         } else if (apv <= 94) {
3358                 header_size = sizeof(struct p_rs_param_89);
3359                 data_size = pi->size - header_size;
3360                 D_ASSERT(data_size == 0);
3361         } else {
3362                 header_size = sizeof(struct p_rs_param_95);
3363                 data_size = pi->size - header_size;
3364                 D_ASSERT(data_size == 0);
3365         }
3366
3367         /* initialize verify_alg and csums_alg */
3368         p = pi->data;
3369         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3370
3371         err = drbd_recv_all(mdev->tconn, p, header_size);
3372         if (err)
3373                 return err;
3374
3375         mutex_lock(&mdev->tconn->conf_update);
3376         old_net_conf = mdev->tconn->net_conf;
3377         if (get_ldev(mdev)) {
3378                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3379                 if (!new_disk_conf) {
3380                         put_ldev(mdev);
3381                         mutex_unlock(&mdev->tconn->conf_update);
3382                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3383                         return -ENOMEM;
3384                 }
3385
3386                 old_disk_conf = mdev->ldev->disk_conf;
3387                 *new_disk_conf = *old_disk_conf;
3388
3389                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3390         }
3391
3392         if (apv >= 88) {
3393                 if (apv == 88) {
3394                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3395                                 dev_err(DEV, "verify-alg of wrong size, "
3396                                         "peer wants %u, accepting only up to %u byte\n",
3397                                         data_size, SHARED_SECRET_MAX);
3398                                 err = -EIO;
3399                                 goto reconnect;
3400                         }
3401
3402                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3403                         if (err)
3404                                 goto reconnect;
3405                         /* we expect NUL terminated string */
3406                         /* but just in case someone tries to be evil */
3407                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3408                         p->verify_alg[data_size-1] = 0;
3409
3410                 } else /* apv >= 89 */ {
3411                         /* we still expect NUL terminated strings */
3412                         /* but just in case someone tries to be evil */
3413                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3414                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3415                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3416                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3417                 }
3418
3419                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3420                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3421                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3422                                     old_net_conf->verify_alg, p->verify_alg);
3423                                 goto disconnect;
3424                         }
3425                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3426                                         p->verify_alg, "verify-alg");
3427                         if (IS_ERR(verify_tfm)) {
3428                                 verify_tfm = NULL;
3429                                 goto disconnect;
3430                         }
3431                 }
3432
3433                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3434                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3435                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3436                                     old_net_conf->csums_alg, p->csums_alg);
3437                                 goto disconnect;
3438                         }
3439                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3440                                         p->csums_alg, "csums-alg");
3441                         if (IS_ERR(csums_tfm)) {
3442                                 csums_tfm = NULL;
3443                                 goto disconnect;
3444                         }
3445                 }
3446
3447                 if (apv > 94 && new_disk_conf) {
3448                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3449                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3450                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3451                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3452
3453                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3454                         if (fifo_size != mdev->rs_plan_s->size) {
3455                                 new_plan = fifo_alloc(fifo_size);
3456                                 if (!new_plan) {
3457                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3458                                         put_ldev(mdev);
3459                                         goto disconnect;
3460                                 }
3461                         }
3462                 }
3463
3464                 if (verify_tfm || csums_tfm) {
3465                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3466                         if (!new_net_conf) {
3467                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3468                                 goto disconnect;
3469                         }
3470
3471                         *new_net_conf = *old_net_conf;
3472
3473                         if (verify_tfm) {
3474                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3475                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3476                                 crypto_free_hash(mdev->tconn->verify_tfm);
3477                                 mdev->tconn->verify_tfm = verify_tfm;
3478                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3479                         }
3480                         if (csums_tfm) {
3481                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3482                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3483                                 crypto_free_hash(mdev->tconn->csums_tfm);
3484                                 mdev->tconn->csums_tfm = csums_tfm;
3485                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3486                         }
3487                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3488                 }
3489         }
3490
3491         if (new_disk_conf) {
3492                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3493                 put_ldev(mdev);
3494         }
3495
3496         if (new_plan) {
3497                 old_plan = mdev->rs_plan_s;
3498                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3499         }
3500
3501         mutex_unlock(&mdev->tconn->conf_update);
3502         synchronize_rcu();
3503         if (new_net_conf)
3504                 kfree(old_net_conf);
3505         kfree(old_disk_conf);
3506         kfree(old_plan);
3507
3508         return 0;
3509
3510 reconnect:
3511         if (new_disk_conf) {
3512                 put_ldev(mdev);
3513                 kfree(new_disk_conf);
3514         }
3515         mutex_unlock(&mdev->tconn->conf_update);
3516         return -EIO;
3517
3518 disconnect:
3519         kfree(new_plan);
3520         if (new_disk_conf) {
3521                 put_ldev(mdev);
3522                 kfree(new_disk_conf);
3523         }
3524         mutex_unlock(&mdev->tconn->conf_update);
3525         /* just for completeness: actually not needed,
3526          * as this is not reached if csums_tfm was ok. */
3527         crypto_free_hash(csums_tfm);
3528         /* but free the verify_tfm again, if csums_tfm did not work out */
3529         crypto_free_hash(verify_tfm);
3530         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3531         return -EIO;
3532 }
3533
3534 /* warn if the arguments differ by more than 12.5% */
3535 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3536         const char *s, sector_t a, sector_t b)
3537 {
3538         sector_t d;
3539         if (a == 0 || b == 0)
3540                 return;
3541         d = (a > b) ? (a - b) : (b - a);
3542         if (d > (a>>3) || d > (b>>3))
3543                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3544                      (unsigned long long)a, (unsigned long long)b);
3545 }
3546
3547 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3548 {
3549         struct drbd_conf *mdev;
3550         struct p_sizes *p = pi->data;
3551         enum determine_dev_size dd = unchanged;
3552         sector_t p_size, p_usize, my_usize;
3553         int ldsc = 0; /* local disk size changed */
3554         enum dds_flags ddsf;
3555
3556         mdev = vnr_to_mdev(tconn, pi->vnr);
3557         if (!mdev)
3558                 return config_unknown_volume(tconn, pi);
3559
3560         p_size = be64_to_cpu(p->d_size);
3561         p_usize = be64_to_cpu(p->u_size);
3562
3563         /* just store the peer's disk size for now.
3564          * we still need to figure out whether we accept that. */
3565         mdev->p_size = p_size;
3566
3567         if (get_ldev(mdev)) {
3568                 rcu_read_lock();
3569                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3570                 rcu_read_unlock();
3571
3572                 warn_if_differ_considerably(mdev, "lower level device sizes",
3573                            p_size, drbd_get_max_capacity(mdev->ldev));
3574                 warn_if_differ_considerably(mdev, "user requested size",
3575                                             p_usize, my_usize);
3576
3577                 /* if this is the first connect, or an otherwise expected
3578                  * param exchange, choose the minimum */
3579                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3580                         p_usize = min_not_zero(my_usize, p_usize);
3581
3582                 /* Never shrink a device with usable data during connect.
3583                    But allow online shrinking if we are connected. */
3584                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3585                     drbd_get_capacity(mdev->this_bdev) &&
3586                     mdev->state.disk >= D_OUTDATED &&
3587                     mdev->state.conn < C_CONNECTED) {
3588                         dev_err(DEV, "The peer's disk size is too small!\n");
3589                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3590                         put_ldev(mdev);
3591                         return -EIO;
3592                 }
3593
3594                 if (my_usize != p_usize) {
3595                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3596
3597                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3598                         if (!new_disk_conf) {
3599                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3600                                 put_ldev(mdev);
3601                                 return -ENOMEM;
3602                         }
3603
3604                         mutex_lock(&mdev->tconn->conf_update);
3605                         old_disk_conf = mdev->ldev->disk_conf;
3606                         *new_disk_conf = *old_disk_conf;
3607                         new_disk_conf->disk_size = p_usize;
3608
3609                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3610                         mutex_unlock(&mdev->tconn->conf_update);
3611                         synchronize_rcu();
3612                         kfree(old_disk_conf);
3613
3614                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3615                                  (unsigned long)my_usize);
3616                 }
3617
3618                 put_ldev(mdev);
3619         }
3620
3621         ddsf = be16_to_cpu(p->dds_flags);
3622         if (get_ldev(mdev)) {
3623                 dd = drbd_determine_dev_size(mdev, ddsf);
3624                 put_ldev(mdev);
3625                 if (dd == dev_size_error)
3626                         return -EIO;
3627                 drbd_md_sync(mdev);
3628         } else {
3629                 /* I am diskless, need to accept the peer's size. */
3630                 drbd_set_my_capacity(mdev, p_size);
3631         }
3632
3633         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3634         drbd_reconsider_max_bio_size(mdev);
3635
3636         if (get_ldev(mdev)) {
3637                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3638                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3639                         ldsc = 1;
3640                 }
3641
3642                 put_ldev(mdev);
3643         }
3644
3645         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3646                 if (be64_to_cpu(p->c_size) !=
3647                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3648                         /* we have different sizes, probably peer
3649                          * needs to know my new size... */
3650                         drbd_send_sizes(mdev, 0, ddsf);
3651                 }
3652                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3653                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3654                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3655                             mdev->state.disk >= D_INCONSISTENT) {
3656                                 if (ddsf & DDSF_NO_RESYNC)
3657                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3658                                 else
3659                                         resync_after_online_grow(mdev);
3660                         } else
3661                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3662                 }
3663         }
3664
3665         return 0;
3666 }
3667
3668 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3669 {
3670         struct drbd_conf *mdev;
3671         struct p_uuids *p = pi->data;
3672         u64 *p_uuid;
3673         int i, updated_uuids = 0;
3674
3675         mdev = vnr_to_mdev(tconn, pi->vnr);
3676         if (!mdev)
3677                 return config_unknown_volume(tconn, pi);
3678
3679         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3680
3681         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3682                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3683
3684         kfree(mdev->p_uuid);
3685         mdev->p_uuid = p_uuid;
3686
3687         if (mdev->state.conn < C_CONNECTED &&
3688             mdev->state.disk < D_INCONSISTENT &&
3689             mdev->state.role == R_PRIMARY &&
3690             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3691                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3692                     (unsigned long long)mdev->ed_uuid);
3693                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3694                 return -EIO;
3695         }
3696
3697         if (get_ldev(mdev)) {
3698                 int skip_initial_sync =
3699                         mdev->state.conn == C_CONNECTED &&
3700                         mdev->tconn->agreed_pro_version >= 90 &&
3701                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3702                         (p_uuid[UI_FLAGS] & 8);
3703                 if (skip_initial_sync) {
3704                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3705                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3706                                         "clear_n_write from receive_uuids",
3707                                         BM_LOCKED_TEST_ALLOWED);
3708                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3709                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3710                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3711                                         CS_VERBOSE, NULL);
3712                         drbd_md_sync(mdev);
3713                         updated_uuids = 1;
3714                 }
3715                 put_ldev(mdev);
3716         } else if (mdev->state.disk < D_INCONSISTENT &&
3717                    mdev->state.role == R_PRIMARY) {
3718                 /* I am a diskless primary, the peer just created a new current UUID
3719                    for me. */
3720                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3721         }
3722
3723         /* Before we test for the disk state, we should wait until an eventually
3724            ongoing cluster wide state change is finished. That is important if
3725            we are primary and are detaching from our disk. We need to see the
3726            new disk state... */
3727         mutex_lock(mdev->state_mutex);
3728         mutex_unlock(mdev->state_mutex);
3729         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3730                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3731
3732         if (updated_uuids)
3733                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3734
3735         return 0;
3736 }
3737
3738 /**
3739  * convert_state() - Converts the peer's view of the cluster state to our point of view
3740  * @ps:         The state as seen by the peer.
3741  */
3742 static union drbd_state convert_state(union drbd_state ps)
3743 {
3744         union drbd_state ms;
3745
3746         static enum drbd_conns c_tab[] = {
3747                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3748                 [C_CONNECTED] = C_CONNECTED,
3749
3750                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3751                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3752                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3753                 [C_VERIFY_S]       = C_VERIFY_T,
3754                 [C_MASK]   = C_MASK,
3755         };
3756
3757         ms.i = ps.i;
3758
3759         ms.conn = c_tab[ps.conn];
3760         ms.peer = ps.role;
3761         ms.role = ps.peer;
3762         ms.pdsk = ps.disk;
3763         ms.disk = ps.pdsk;
3764         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3765
3766         return ms;
3767 }
3768
3769 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3770 {
3771         struct drbd_conf *mdev;
3772         struct p_req_state *p = pi->data;
3773         union drbd_state mask, val;
3774         enum drbd_state_rv rv;
3775
3776         mdev = vnr_to_mdev(tconn, pi->vnr);
3777         if (!mdev)
3778                 return -EIO;
3779
3780         mask.i = be32_to_cpu(p->mask);
3781         val.i = be32_to_cpu(p->val);
3782
3783         if (test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags) &&
3784             mutex_is_locked(mdev->state_mutex)) {
3785                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3786                 return 0;
3787         }
3788
3789         mask = convert_state(mask);
3790         val = convert_state(val);
3791
3792         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3793         drbd_send_sr_reply(mdev, rv);
3794
3795         drbd_md_sync(mdev);
3796
3797         return 0;
3798 }
3799
3800 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3801 {
3802         struct p_req_state *p = pi->data;
3803         union drbd_state mask, val;
3804         enum drbd_state_rv rv;
3805
3806         mask.i = be32_to_cpu(p->mask);
3807         val.i = be32_to_cpu(p->val);
3808
3809         if (test_bit(RESOLVE_CONFLICTS, &tconn->flags) &&
3810             mutex_is_locked(&tconn->cstate_mutex)) {
3811                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3812                 return 0;
3813         }
3814
3815         mask = convert_state(mask);
3816         val = convert_state(val);
3817
3818         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3819         conn_send_sr_reply(tconn, rv);
3820
3821         return 0;
3822 }
3823
3824 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3825 {
3826         struct drbd_conf *mdev;
3827         struct p_state *p = pi->data;
3828         union drbd_state os, ns, peer_state;
3829         enum drbd_disk_state real_peer_disk;
3830         enum chg_state_flags cs_flags;
3831         int rv;
3832
3833         mdev = vnr_to_mdev(tconn, pi->vnr);
3834         if (!mdev)
3835                 return config_unknown_volume(tconn, pi);
3836
3837         peer_state.i = be32_to_cpu(p->state);
3838
3839         real_peer_disk = peer_state.disk;
3840         if (peer_state.disk == D_NEGOTIATING) {
3841                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3842                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3843         }
3844
3845         spin_lock_irq(&mdev->tconn->req_lock);
3846  retry:
3847         os = ns = drbd_read_state(mdev);
3848         spin_unlock_irq(&mdev->tconn->req_lock);
3849
3850         /* If some other part of the code (asender thread, timeout)
3851          * already decided to close the connection again,
3852          * we must not "re-establish" it here. */
3853         if (os.conn <= C_TEAR_DOWN)
3854                 return -ECONNRESET;
3855
3856         /* If this is the "end of sync" confirmation, usually the peer disk
3857          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3858          * set) resync started in PausedSyncT, or if the timing of pause-/
3859          * unpause-sync events has been "just right", the peer disk may
3860          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3861          */
3862         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3863             real_peer_disk == D_UP_TO_DATE &&
3864             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3865                 /* If we are (becoming) SyncSource, but peer is still in sync
3866                  * preparation, ignore its uptodate-ness to avoid flapping, it
3867                  * will change to inconsistent once the peer reaches active
3868                  * syncing states.
3869                  * It may have changed syncer-paused flags, however, so we
3870                  * cannot ignore this completely. */
3871                 if (peer_state.conn > C_CONNECTED &&
3872                     peer_state.conn < C_SYNC_SOURCE)
3873                         real_peer_disk = D_INCONSISTENT;
3874
3875                 /* if peer_state changes to connected at the same time,
3876                  * it explicitly notifies us that it finished resync.
3877                  * Maybe we should finish it up, too? */
3878                 else if (os.conn >= C_SYNC_SOURCE &&
3879                          peer_state.conn == C_CONNECTED) {
3880                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3881                                 drbd_resync_finished(mdev);
3882                         return 0;
3883                 }
3884         }
3885
3886         /* explicit verify finished notification, stop sector reached. */
3887         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3888             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3889                 ov_out_of_sync_print(mdev);
3890                 drbd_resync_finished(mdev);
3891                 return 0;
3892         }
3893
3894         /* peer says his disk is inconsistent, while we think it is uptodate,
3895          * and this happens while the peer still thinks we have a sync going on,
3896          * but we think we are already done with the sync.
3897          * We ignore this to avoid flapping pdsk.
3898          * This should not happen, if the peer is a recent version of drbd. */
3899         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3900             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3901                 real_peer_disk = D_UP_TO_DATE;
3902
3903         if (ns.conn == C_WF_REPORT_PARAMS)
3904                 ns.conn = C_CONNECTED;
3905
3906         if (peer_state.conn == C_AHEAD)
3907                 ns.conn = C_BEHIND;
3908
3909         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3910             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3911                 int cr; /* consider resync */
3912
3913                 /* if we established a new connection */
3914                 cr  = (os.conn < C_CONNECTED);
3915                 /* if we had an established connection
3916                  * and one of the nodes newly attaches a disk */
3917                 cr |= (os.conn == C_CONNECTED &&
3918                        (peer_state.disk == D_NEGOTIATING ||
3919                         os.disk == D_NEGOTIATING));
3920                 /* if we have both been inconsistent, and the peer has been
3921                  * forced to be UpToDate with --overwrite-data */
3922                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3923                 /* if we had been plain connected, and the admin requested to
3924                  * start a sync by "invalidate" or "invalidate-remote" */
3925                 cr |= (os.conn == C_CONNECTED &&
3926                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3927                                  peer_state.conn <= C_WF_BITMAP_T));
3928
3929                 if (cr)
3930                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3931
3932                 put_ldev(mdev);
3933                 if (ns.conn == C_MASK) {
3934                         ns.conn = C_CONNECTED;
3935                         if (mdev->state.disk == D_NEGOTIATING) {
3936                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3937                         } else if (peer_state.disk == D_NEGOTIATING) {
3938                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3939                                 peer_state.disk = D_DISKLESS;
3940                                 real_peer_disk = D_DISKLESS;
3941                         } else {
3942                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3943                                         return -EIO;
3944                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3945                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3946                                 return -EIO;
3947                         }
3948                 }
3949         }
3950
3951         spin_lock_irq(&mdev->tconn->req_lock);
3952         if (os.i != drbd_read_state(mdev).i)
3953                 goto retry;
3954         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3955         ns.peer = peer_state.role;
3956         ns.pdsk = real_peer_disk;
3957         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3958         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3959                 ns.disk = mdev->new_state_tmp.disk;
3960         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3961         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3962             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3963                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3964                    for temporal network outages! */
3965                 spin_unlock_irq(&mdev->tconn->req_lock);
3966                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3967                 tl_clear(mdev->tconn);
3968                 drbd_uuid_new_current(mdev);
3969                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3970                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3971                 return -EIO;
3972         }
3973         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3974         ns = drbd_read_state(mdev);
3975         spin_unlock_irq(&mdev->tconn->req_lock);
3976
3977         if (rv < SS_SUCCESS) {
3978                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3979                 return -EIO;
3980         }
3981
3982         if (os.conn > C_WF_REPORT_PARAMS) {
3983                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3984                     peer_state.disk != D_NEGOTIATING ) {
3985                         /* we want resync, peer has not yet decided to sync... */
3986                         /* Nowadays only used when forcing a node into primary role and
3987                            setting its disk to UpToDate with that */
3988                         drbd_send_uuids(mdev);
3989                         drbd_send_current_state(mdev);
3990                 }
3991         }
3992
3993         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3994
3995         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3996
3997         return 0;
3998 }
3999
4000 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
4001 {
4002         struct drbd_conf *mdev;
4003         struct p_rs_uuid *p = pi->data;
4004
4005         mdev = vnr_to_mdev(tconn, pi->vnr);
4006         if (!mdev)
4007                 return -EIO;
4008
4009         wait_event(mdev->misc_wait,
4010                    mdev->state.conn == C_WF_SYNC_UUID ||
4011                    mdev->state.conn == C_BEHIND ||
4012                    mdev->state.conn < C_CONNECTED ||
4013                    mdev->state.disk < D_NEGOTIATING);
4014
4015         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
4016
4017         /* Here the _drbd_uuid_ functions are right, current should
4018            _not_ be rotated into the history */
4019         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
4020                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
4021                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
4022
4023                 drbd_print_uuids(mdev, "updated sync uuid");
4024                 drbd_start_resync(mdev, C_SYNC_TARGET);
4025
4026                 put_ldev(mdev);
4027         } else
4028                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
4029
4030         return 0;
4031 }
4032
4033 /**
4034  * receive_bitmap_plain
4035  *
4036  * Return 0 when done, 1 when another iteration is needed, and a negative error
4037  * code upon failure.
4038  */
4039 static int
4040 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4041                      unsigned long *p, struct bm_xfer_ctx *c)
4042 {
4043         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4044                                  drbd_header_size(mdev->tconn);
4045         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4046                                        c->bm_words - c->word_offset);
4047         unsigned int want = num_words * sizeof(*p);
4048         int err;
4049
4050         if (want != size) {
4051                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4052                 return -EIO;
4053         }
4054         if (want == 0)
4055                 return 0;
4056         err = drbd_recv_all(mdev->tconn, p, want);
4057         if (err)
4058                 return err;
4059
4060         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4061
4062         c->word_offset += num_words;
4063         c->bit_offset = c->word_offset * BITS_PER_LONG;
4064         if (c->bit_offset > c->bm_bits)
4065                 c->bit_offset = c->bm_bits;
4066
4067         return 1;
4068 }
4069
4070 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4071 {
4072         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4073 }
4074
4075 static int dcbp_get_start(struct p_compressed_bm *p)
4076 {
4077         return (p->encoding & 0x80) != 0;
4078 }
4079
4080 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4081 {
4082         return (p->encoding >> 4) & 0x7;
4083 }
4084
4085 /**
4086  * recv_bm_rle_bits
4087  *
4088  * Return 0 when done, 1 when another iteration is needed, and a negative error
4089  * code upon failure.
4090  */
4091 static int
4092 recv_bm_rle_bits(struct drbd_conf *mdev,
4093                 struct p_compressed_bm *p,
4094                  struct bm_xfer_ctx *c,
4095                  unsigned int len)
4096 {
4097         struct bitstream bs;
4098         u64 look_ahead;
4099         u64 rl;
4100         u64 tmp;
4101         unsigned long s = c->bit_offset;
4102         unsigned long e;
4103         int toggle = dcbp_get_start(p);
4104         int have;
4105         int bits;
4106
4107         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4108
4109         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4110         if (bits < 0)
4111                 return -EIO;
4112
4113         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4114                 bits = vli_decode_bits(&rl, look_ahead);
4115                 if (bits <= 0)
4116                         return -EIO;
4117
4118                 if (toggle) {
4119                         e = s + rl -1;
4120                         if (e >= c->bm_bits) {
4121                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4122                                 return -EIO;
4123                         }
4124                         _drbd_bm_set_bits(mdev, s, e);
4125                 }
4126
4127                 if (have < bits) {
4128                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4129                                 have, bits, look_ahead,
4130                                 (unsigned int)(bs.cur.b - p->code),
4131                                 (unsigned int)bs.buf_len);
4132                         return -EIO;
4133                 }
4134                 look_ahead >>= bits;
4135                 have -= bits;
4136
4137                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4138                 if (bits < 0)
4139                         return -EIO;
4140                 look_ahead |= tmp << have;
4141                 have += bits;
4142         }
4143
4144         c->bit_offset = s;
4145         bm_xfer_ctx_bit_to_word_offset(c);
4146
4147         return (s != c->bm_bits);
4148 }
4149
4150 /**
4151  * decode_bitmap_c
4152  *
4153  * Return 0 when done, 1 when another iteration is needed, and a negative error
4154  * code upon failure.
4155  */
4156 static int
4157 decode_bitmap_c(struct drbd_conf *mdev,
4158                 struct p_compressed_bm *p,
4159                 struct bm_xfer_ctx *c,
4160                 unsigned int len)
4161 {
4162         if (dcbp_get_code(p) == RLE_VLI_Bits)
4163                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4164
4165         /* other variants had been implemented for evaluation,
4166          * but have been dropped as this one turned out to be "best"
4167          * during all our tests. */
4168
4169         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4170         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4171         return -EIO;
4172 }
4173
4174 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4175                 const char *direction, struct bm_xfer_ctx *c)
4176 {
4177         /* what would it take to transfer it "plaintext" */
4178         unsigned int header_size = drbd_header_size(mdev->tconn);
4179         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4180         unsigned int plain =
4181                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4182                 c->bm_words * sizeof(unsigned long);
4183         unsigned int total = c->bytes[0] + c->bytes[1];
4184         unsigned int r;
4185
4186         /* total can not be zero. but just in case: */
4187         if (total == 0)
4188                 return;
4189
4190         /* don't report if not compressed */
4191         if (total >= plain)
4192                 return;
4193
4194         /* total < plain. check for overflow, still */
4195         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4196                                     : (1000 * total / plain);
4197
4198         if (r > 1000)
4199                 r = 1000;
4200
4201         r = 1000 - r;
4202         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4203              "total %u; compression: %u.%u%%\n",
4204                         direction,
4205                         c->bytes[1], c->packets[1],
4206                         c->bytes[0], c->packets[0],
4207                         total, r/10, r % 10);
4208 }
4209
4210 /* Since we are processing the bitfield from lower addresses to higher,
4211    it does not matter if the process it in 32 bit chunks or 64 bit
4212    chunks as long as it is little endian. (Understand it as byte stream,
4213    beginning with the lowest byte...) If we would use big endian
4214    we would need to process it from the highest address to the lowest,
4215    in order to be agnostic to the 32 vs 64 bits issue.
4216
4217    returns 0 on failure, 1 if we successfully received it. */
4218 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4219 {
4220         struct drbd_conf *mdev;
4221         struct bm_xfer_ctx c;
4222         int err;
4223
4224         mdev = vnr_to_mdev(tconn, pi->vnr);
4225         if (!mdev)
4226                 return -EIO;
4227
4228         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4229         /* you are supposed to send additional out-of-sync information
4230          * if you actually set bits during this phase */
4231
4232         c = (struct bm_xfer_ctx) {
4233                 .bm_bits = drbd_bm_bits(mdev),
4234                 .bm_words = drbd_bm_words(mdev),
4235         };
4236
4237         for(;;) {
4238                 if (pi->cmd == P_BITMAP)
4239                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4240                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4241                         /* MAYBE: sanity check that we speak proto >= 90,
4242                          * and the feature is enabled! */
4243                         struct p_compressed_bm *p = pi->data;
4244
4245                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4246                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4247                                 err = -EIO;
4248                                 goto out;
4249                         }
4250                         if (pi->size <= sizeof(*p)) {
4251                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4252                                 err = -EIO;
4253                                 goto out;
4254                         }
4255                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4256                         if (err)
4257                                goto out;
4258                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4259                 } else {
4260                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4261                         err = -EIO;
4262                         goto out;
4263                 }
4264
4265                 c.packets[pi->cmd == P_BITMAP]++;
4266                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4267
4268                 if (err <= 0) {
4269                         if (err < 0)
4270                                 goto out;
4271                         break;
4272                 }
4273                 err = drbd_recv_header(mdev->tconn, pi);
4274                 if (err)
4275                         goto out;
4276         }
4277
4278         INFO_bm_xfer_stats(mdev, "receive", &c);
4279
4280         if (mdev->state.conn == C_WF_BITMAP_T) {
4281                 enum drbd_state_rv rv;
4282
4283                 err = drbd_send_bitmap(mdev);
4284                 if (err)
4285                         goto out;
4286                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4287                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4288                 D_ASSERT(rv == SS_SUCCESS);
4289         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4290                 /* admin may have requested C_DISCONNECTING,
4291                  * other threads may have noticed network errors */
4292                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4293                     drbd_conn_str(mdev->state.conn));
4294         }
4295         err = 0;
4296
4297  out:
4298         drbd_bm_unlock(mdev);
4299         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4300                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4301         return err;
4302 }
4303
4304 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4305 {
4306         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4307                  pi->cmd, pi->size);
4308
4309         return ignore_remaining_packet(tconn, pi);
4310 }
4311
4312 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4313 {
4314         /* Make sure we've acked all the TCP data associated
4315          * with the data requests being unplugged */
4316         drbd_tcp_quickack(tconn->data.socket);
4317
4318         return 0;
4319 }
4320
4321 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4322 {
4323         struct drbd_conf *mdev;
4324         struct p_block_desc *p = pi->data;
4325
4326         mdev = vnr_to_mdev(tconn, pi->vnr);
4327         if (!mdev)
4328                 return -EIO;
4329
4330         switch (mdev->state.conn) {
4331         case C_WF_SYNC_UUID:
4332         case C_WF_BITMAP_T:
4333         case C_BEHIND:
4334                         break;
4335         default:
4336                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4337                                 drbd_conn_str(mdev->state.conn));
4338         }
4339
4340         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4341
4342         return 0;
4343 }
4344
4345 struct data_cmd {
4346         int expect_payload;
4347         size_t pkt_size;
4348         int (*fn)(struct drbd_tconn *, struct packet_info *);
4349 };
4350
4351 static struct data_cmd drbd_cmd_handler[] = {
4352         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4353         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4354         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4355         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4356         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4357         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4358         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4359         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4360         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4361         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4362         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4363         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4364         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4365         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4366         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4367         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4368         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4369         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4370         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4371         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4372         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4373         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4374         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4375         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4376 };
4377
4378 static void drbdd(struct drbd_tconn *tconn)
4379 {
4380         struct packet_info pi;
4381         size_t shs; /* sub header size */
4382         int err;
4383
4384         while (get_t_state(&tconn->receiver) == RUNNING) {
4385                 struct data_cmd *cmd;
4386
4387                 drbd_thread_current_set_cpu(&tconn->receiver);
4388                 if (drbd_recv_header(tconn, &pi))
4389                         goto err_out;
4390
4391                 cmd = &drbd_cmd_handler[pi.cmd];
4392                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4393                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4394                                  cmdname(pi.cmd), pi.cmd);
4395                         goto err_out;
4396                 }
4397
4398                 shs = cmd->pkt_size;
4399                 if (pi.size > shs && !cmd->expect_payload) {
4400                         conn_err(tconn, "No payload expected %s l:%d\n",
4401                                  cmdname(pi.cmd), pi.size);
4402                         goto err_out;
4403                 }
4404
4405                 if (shs) {
4406                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4407                         if (err)
4408                                 goto err_out;
4409                         pi.size -= shs;
4410                 }
4411
4412                 err = cmd->fn(tconn, &pi);
4413                 if (err) {
4414                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4415                                  cmdname(pi.cmd), err, pi.size);
4416                         goto err_out;
4417                 }
4418         }
4419         return;
4420
4421     err_out:
4422         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4423 }
4424
4425 void conn_flush_workqueue(struct drbd_tconn *tconn)
4426 {
4427         struct drbd_wq_barrier barr;
4428
4429         barr.w.cb = w_prev_work_done;
4430         barr.w.tconn = tconn;
4431         init_completion(&barr.done);
4432         drbd_queue_work(&tconn->sender_work, &barr.w);
4433         wait_for_completion(&barr.done);
4434 }
4435
4436 static void conn_disconnect(struct drbd_tconn *tconn)
4437 {
4438         struct drbd_conf *mdev;
4439         enum drbd_conns oc;
4440         int vnr;
4441
4442         if (tconn->cstate == C_STANDALONE)
4443                 return;
4444
4445         /* We are about to start the cleanup after connection loss.
4446          * Make sure drbd_make_request knows about that.
4447          * Usually we should be in some network failure state already,
4448          * but just in case we are not, we fix it up here.
4449          */
4450         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4451
4452         /* asender does not clean up anything. it must not interfere, either */
4453         drbd_thread_stop(&tconn->asender);
4454         drbd_free_sock(tconn);
4455
4456         rcu_read_lock();
4457         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4458                 kref_get(&mdev->kref);
4459                 rcu_read_unlock();
4460                 drbd_disconnected(mdev);
4461                 kref_put(&mdev->kref, &drbd_minor_destroy);
4462                 rcu_read_lock();
4463         }
4464         rcu_read_unlock();
4465
4466         if (!list_empty(&tconn->current_epoch->list))
4467                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4468         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4469         atomic_set(&tconn->current_epoch->epoch_size, 0);
4470         tconn->send.seen_any_write_yet = false;
4471
4472         conn_info(tconn, "Connection closed\n");
4473
4474         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4475                 conn_try_outdate_peer_async(tconn);
4476
4477         spin_lock_irq(&tconn->req_lock);
4478         oc = tconn->cstate;
4479         if (oc >= C_UNCONNECTED)
4480                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4481
4482         spin_unlock_irq(&tconn->req_lock);
4483
4484         if (oc == C_DISCONNECTING)
4485                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4486 }
4487
4488 static int drbd_disconnected(struct drbd_conf *mdev)
4489 {
4490         unsigned int i;
4491
4492         /* wait for current activity to cease. */
4493         spin_lock_irq(&mdev->tconn->req_lock);
4494         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4495         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4496         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4497         spin_unlock_irq(&mdev->tconn->req_lock);
4498
4499         /* We do not have data structures that would allow us to
4500          * get the rs_pending_cnt down to 0 again.
4501          *  * On C_SYNC_TARGET we do not have any data structures describing
4502          *    the pending RSDataRequest's we have sent.
4503          *  * On C_SYNC_SOURCE there is no data structure that tracks
4504          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4505          *  And no, it is not the sum of the reference counts in the
4506          *  resync_LRU. The resync_LRU tracks the whole operation including
4507          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4508          *  on the fly. */
4509         drbd_rs_cancel_all(mdev);
4510         mdev->rs_total = 0;
4511         mdev->rs_failed = 0;
4512         atomic_set(&mdev->rs_pending_cnt, 0);
4513         wake_up(&mdev->misc_wait);
4514
4515         del_timer_sync(&mdev->resync_timer);
4516         resync_timer_fn((unsigned long)mdev);
4517
4518         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4519          * w_make_resync_request etc. which may still be on the worker queue
4520          * to be "canceled" */
4521         drbd_flush_workqueue(mdev);
4522
4523         drbd_finish_peer_reqs(mdev);
4524
4525         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4526            might have issued a work again. The one before drbd_finish_peer_reqs() is
4527            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4528         drbd_flush_workqueue(mdev);
4529
4530         kfree(mdev->p_uuid);
4531         mdev->p_uuid = NULL;
4532
4533         if (!drbd_suspended(mdev))
4534                 tl_clear(mdev->tconn);
4535
4536         drbd_md_sync(mdev);
4537
4538         /* serialize with bitmap writeout triggered by the state change,
4539          * if any. */
4540         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4541
4542         /* tcp_close and release of sendpage pages can be deferred.  I don't
4543          * want to use SO_LINGER, because apparently it can be deferred for
4544          * more than 20 seconds (longest time I checked).
4545          *
4546          * Actually we don't care for exactly when the network stack does its
4547          * put_page(), but release our reference on these pages right here.
4548          */
4549         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4550         if (i)
4551                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4552         i = atomic_read(&mdev->pp_in_use_by_net);
4553         if (i)
4554                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4555         i = atomic_read(&mdev->pp_in_use);
4556         if (i)
4557                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4558
4559         D_ASSERT(list_empty(&mdev->read_ee));
4560         D_ASSERT(list_empty(&mdev->active_ee));
4561         D_ASSERT(list_empty(&mdev->sync_ee));
4562         D_ASSERT(list_empty(&mdev->done_ee));
4563
4564         return 0;
4565 }
4566
4567 /*
4568  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4569  * we can agree on is stored in agreed_pro_version.
4570  *
4571  * feature flags and the reserved array should be enough room for future
4572  * enhancements of the handshake protocol, and possible plugins...
4573  *
4574  * for now, they are expected to be zero, but ignored.
4575  */
4576 static int drbd_send_features(struct drbd_tconn *tconn)
4577 {
4578         struct drbd_socket *sock;
4579         struct p_connection_features *p;
4580
4581         sock = &tconn->data;
4582         p = conn_prepare_command(tconn, sock);
4583         if (!p)
4584                 return -EIO;
4585         memset(p, 0, sizeof(*p));
4586         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4587         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4588         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4589 }
4590
4591 /*
4592  * return values:
4593  *   1 yes, we have a valid connection
4594  *   0 oops, did not work out, please try again
4595  *  -1 peer talks different language,
4596  *     no point in trying again, please go standalone.
4597  */
4598 static int drbd_do_features(struct drbd_tconn *tconn)
4599 {
4600         /* ASSERT current == tconn->receiver ... */
4601         struct p_connection_features *p;
4602         const int expect = sizeof(struct p_connection_features);
4603         struct packet_info pi;
4604         int err;
4605
4606         err = drbd_send_features(tconn);
4607         if (err)
4608                 return 0;
4609
4610         err = drbd_recv_header(tconn, &pi);
4611         if (err)
4612                 return 0;
4613
4614         if (pi.cmd != P_CONNECTION_FEATURES) {
4615                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4616                          cmdname(pi.cmd), pi.cmd);
4617                 return -1;
4618         }
4619
4620         if (pi.size != expect) {
4621                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4622                      expect, pi.size);
4623                 return -1;
4624         }
4625
4626         p = pi.data;
4627         err = drbd_recv_all_warn(tconn, p, expect);
4628         if (err)
4629                 return 0;
4630
4631         p->protocol_min = be32_to_cpu(p->protocol_min);
4632         p->protocol_max = be32_to_cpu(p->protocol_max);
4633         if (p->protocol_max == 0)
4634                 p->protocol_max = p->protocol_min;
4635
4636         if (PRO_VERSION_MAX < p->protocol_min ||
4637             PRO_VERSION_MIN > p->protocol_max)
4638                 goto incompat;
4639
4640         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4641
4642         conn_info(tconn, "Handshake successful: "
4643              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4644
4645         return 1;
4646
4647  incompat:
4648         conn_err(tconn, "incompatible DRBD dialects: "
4649             "I support %d-%d, peer supports %d-%d\n",
4650             PRO_VERSION_MIN, PRO_VERSION_MAX,
4651             p->protocol_min, p->protocol_max);
4652         return -1;
4653 }
4654
4655 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4656 static int drbd_do_auth(struct drbd_tconn *tconn)
4657 {
4658         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4659         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4660         return -1;
4661 }
4662 #else
4663 #define CHALLENGE_LEN 64
4664
4665 /* Return value:
4666         1 - auth succeeded,
4667         0 - failed, try again (network error),
4668         -1 - auth failed, don't try again.
4669 */
4670
4671 static int drbd_do_auth(struct drbd_tconn *tconn)
4672 {
4673         struct drbd_socket *sock;
4674         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4675         struct scatterlist sg;
4676         char *response = NULL;
4677         char *right_response = NULL;
4678         char *peers_ch = NULL;
4679         unsigned int key_len;
4680         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4681         unsigned int resp_size;
4682         struct hash_desc desc;
4683         struct packet_info pi;
4684         struct net_conf *nc;
4685         int err, rv;
4686
4687         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4688
4689         rcu_read_lock();
4690         nc = rcu_dereference(tconn->net_conf);
4691         key_len = strlen(nc->shared_secret);
4692         memcpy(secret, nc->shared_secret, key_len);
4693         rcu_read_unlock();
4694
4695         desc.tfm = tconn->cram_hmac_tfm;
4696         desc.flags = 0;
4697
4698         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4699         if (rv) {
4700                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4701                 rv = -1;
4702                 goto fail;
4703         }
4704
4705         get_random_bytes(my_challenge, CHALLENGE_LEN);
4706
4707         sock = &tconn->data;
4708         if (!conn_prepare_command(tconn, sock)) {
4709                 rv = 0;
4710                 goto fail;
4711         }
4712         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4713                                 my_challenge, CHALLENGE_LEN);
4714         if (!rv)
4715                 goto fail;
4716
4717         err = drbd_recv_header(tconn, &pi);
4718         if (err) {
4719                 rv = 0;
4720                 goto fail;
4721         }
4722
4723         if (pi.cmd != P_AUTH_CHALLENGE) {
4724                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4725                          cmdname(pi.cmd), pi.cmd);
4726                 rv = 0;
4727                 goto fail;
4728         }
4729
4730         if (pi.size > CHALLENGE_LEN * 2) {
4731                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4732                 rv = -1;
4733                 goto fail;
4734         }
4735
4736         peers_ch = kmalloc(pi.size, GFP_NOIO);
4737         if (peers_ch == NULL) {
4738                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4739                 rv = -1;
4740                 goto fail;
4741         }
4742
4743         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4744         if (err) {
4745                 rv = 0;
4746                 goto fail;
4747         }
4748
4749         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4750         response = kmalloc(resp_size, GFP_NOIO);
4751         if (response == NULL) {
4752                 conn_err(tconn, "kmalloc of response failed\n");
4753                 rv = -1;
4754                 goto fail;
4755         }
4756
4757         sg_init_table(&sg, 1);
4758         sg_set_buf(&sg, peers_ch, pi.size);
4759
4760         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4761         if (rv) {
4762                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4763                 rv = -1;
4764                 goto fail;
4765         }
4766
4767         if (!conn_prepare_command(tconn, sock)) {
4768                 rv = 0;
4769                 goto fail;
4770         }
4771         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4772                                 response, resp_size);
4773         if (!rv)
4774                 goto fail;
4775
4776         err = drbd_recv_header(tconn, &pi);
4777         if (err) {
4778                 rv = 0;
4779                 goto fail;
4780         }
4781
4782         if (pi.cmd != P_AUTH_RESPONSE) {
4783                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4784                          cmdname(pi.cmd), pi.cmd);
4785                 rv = 0;
4786                 goto fail;
4787         }
4788
4789         if (pi.size != resp_size) {
4790                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4791                 rv = 0;
4792                 goto fail;
4793         }
4794
4795         err = drbd_recv_all_warn(tconn, response , resp_size);
4796         if (err) {
4797                 rv = 0;
4798                 goto fail;
4799         }
4800
4801         right_response = kmalloc(resp_size, GFP_NOIO);
4802         if (right_response == NULL) {
4803                 conn_err(tconn, "kmalloc of right_response failed\n");
4804                 rv = -1;
4805                 goto fail;
4806         }
4807
4808         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4809
4810         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4811         if (rv) {
4812                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4813                 rv = -1;
4814                 goto fail;
4815         }
4816
4817         rv = !memcmp(response, right_response, resp_size);
4818
4819         if (rv)
4820                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4821                      resp_size);
4822         else
4823                 rv = -1;
4824
4825  fail:
4826         kfree(peers_ch);
4827         kfree(response);
4828         kfree(right_response);
4829
4830         return rv;
4831 }
4832 #endif
4833
4834 int drbdd_init(struct drbd_thread *thi)
4835 {
4836         struct drbd_tconn *tconn = thi->tconn;
4837         int h;
4838
4839         conn_info(tconn, "receiver (re)started\n");
4840
4841         do {
4842                 h = conn_connect(tconn);
4843                 if (h == 0) {
4844                         conn_disconnect(tconn);
4845                         schedule_timeout_interruptible(HZ);
4846                 }
4847                 if (h == -1) {
4848                         conn_warn(tconn, "Discarding network configuration.\n");
4849                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4850                 }
4851         } while (h == 0);
4852
4853         if (h > 0)
4854                 drbdd(tconn);
4855
4856         conn_disconnect(tconn);
4857
4858         conn_info(tconn, "receiver terminated\n");
4859         return 0;
4860 }
4861
4862 /* ********* acknowledge sender ******** */
4863
4864 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4865 {
4866         struct p_req_state_reply *p = pi->data;
4867         int retcode = be32_to_cpu(p->retcode);
4868
4869         if (retcode >= SS_SUCCESS) {
4870                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4871         } else {
4872                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4873                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4874                          drbd_set_st_err_str(retcode), retcode);
4875         }
4876         wake_up(&tconn->ping_wait);
4877
4878         return 0;
4879 }
4880
4881 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4882 {
4883         struct drbd_conf *mdev;
4884         struct p_req_state_reply *p = pi->data;
4885         int retcode = be32_to_cpu(p->retcode);
4886
4887         mdev = vnr_to_mdev(tconn, pi->vnr);
4888         if (!mdev)
4889                 return -EIO;
4890
4891         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4892                 D_ASSERT(tconn->agreed_pro_version < 100);
4893                 return got_conn_RqSReply(tconn, pi);
4894         }
4895
4896         if (retcode >= SS_SUCCESS) {
4897                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4898         } else {
4899                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4900                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4901                         drbd_set_st_err_str(retcode), retcode);
4902         }
4903         wake_up(&mdev->state_wait);
4904
4905         return 0;
4906 }
4907
4908 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4909 {
4910         return drbd_send_ping_ack(tconn);
4911
4912 }
4913
4914 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4915 {
4916         /* restore idle timeout */
4917         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4918         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4919                 wake_up(&tconn->ping_wait);
4920
4921         return 0;
4922 }
4923
4924 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4925 {
4926         struct drbd_conf *mdev;
4927         struct p_block_ack *p = pi->data;
4928         sector_t sector = be64_to_cpu(p->sector);
4929         int blksize = be32_to_cpu(p->blksize);
4930
4931         mdev = vnr_to_mdev(tconn, pi->vnr);
4932         if (!mdev)
4933                 return -EIO;
4934
4935         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4936
4937         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4938
4939         if (get_ldev(mdev)) {
4940                 drbd_rs_complete_io(mdev, sector);
4941                 drbd_set_in_sync(mdev, sector, blksize);
4942                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4943                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4944                 put_ldev(mdev);
4945         }
4946         dec_rs_pending(mdev);
4947         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4948
4949         return 0;
4950 }
4951
4952 static int
4953 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4954                               struct rb_root *root, const char *func,
4955                               enum drbd_req_event what, bool missing_ok)
4956 {
4957         struct drbd_request *req;
4958         struct bio_and_error m;
4959
4960         spin_lock_irq(&mdev->tconn->req_lock);
4961         req = find_request(mdev, root, id, sector, missing_ok, func);
4962         if (unlikely(!req)) {
4963                 spin_unlock_irq(&mdev->tconn->req_lock);
4964                 return -EIO;
4965         }
4966         __req_mod(req, what, &m);
4967         spin_unlock_irq(&mdev->tconn->req_lock);
4968
4969         if (m.bio)
4970                 complete_master_bio(mdev, &m);
4971         return 0;
4972 }
4973
4974 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4975 {
4976         struct drbd_conf *mdev;
4977         struct p_block_ack *p = pi->data;
4978         sector_t sector = be64_to_cpu(p->sector);
4979         int blksize = be32_to_cpu(p->blksize);
4980         enum drbd_req_event what;
4981
4982         mdev = vnr_to_mdev(tconn, pi->vnr);
4983         if (!mdev)
4984                 return -EIO;
4985
4986         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4987
4988         if (p->block_id == ID_SYNCER) {
4989                 drbd_set_in_sync(mdev, sector, blksize);
4990                 dec_rs_pending(mdev);
4991                 return 0;
4992         }
4993         switch (pi->cmd) {
4994         case P_RS_WRITE_ACK:
4995                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4996                 break;
4997         case P_WRITE_ACK:
4998                 what = WRITE_ACKED_BY_PEER;
4999                 break;
5000         case P_RECV_ACK:
5001                 what = RECV_ACKED_BY_PEER;
5002                 break;
5003         case P_SUPERSEDED:
5004                 what = CONFLICT_RESOLVED;
5005                 break;
5006         case P_RETRY_WRITE:
5007                 what = POSTPONE_WRITE;
5008                 break;
5009         default:
5010                 BUG();
5011         }
5012
5013         return validate_req_change_req_state(mdev, p->block_id, sector,
5014                                              &mdev->write_requests, __func__,
5015                                              what, false);
5016 }
5017
5018 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
5019 {
5020         struct drbd_conf *mdev;
5021         struct p_block_ack *p = pi->data;
5022         sector_t sector = be64_to_cpu(p->sector);
5023         int size = be32_to_cpu(p->blksize);
5024         int err;
5025
5026         mdev = vnr_to_mdev(tconn, pi->vnr);
5027         if (!mdev)
5028                 return -EIO;
5029
5030         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5031
5032         if (p->block_id == ID_SYNCER) {
5033                 dec_rs_pending(mdev);
5034                 drbd_rs_failed_io(mdev, sector, size);
5035                 return 0;
5036         }
5037
5038         err = validate_req_change_req_state(mdev, p->block_id, sector,
5039                                             &mdev->write_requests, __func__,
5040                                             NEG_ACKED, true);
5041         if (err) {
5042                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5043                    The master bio might already be completed, therefore the
5044                    request is no longer in the collision hash. */
5045                 /* In Protocol B we might already have got a P_RECV_ACK
5046                    but then get a P_NEG_ACK afterwards. */
5047                 drbd_set_out_of_sync(mdev, sector, size);
5048         }
5049         return 0;
5050 }
5051
5052 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5053 {
5054         struct drbd_conf *mdev;
5055         struct p_block_ack *p = pi->data;
5056         sector_t sector = be64_to_cpu(p->sector);
5057
5058         mdev = vnr_to_mdev(tconn, pi->vnr);
5059         if (!mdev)
5060                 return -EIO;
5061
5062         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5063
5064         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5065             (unsigned long long)sector, be32_to_cpu(p->blksize));
5066
5067         return validate_req_change_req_state(mdev, p->block_id, sector,
5068                                              &mdev->read_requests, __func__,
5069                                              NEG_ACKED, false);
5070 }
5071
5072 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5073 {
5074         struct drbd_conf *mdev;
5075         sector_t sector;
5076         int size;
5077         struct p_block_ack *p = pi->data;
5078
5079         mdev = vnr_to_mdev(tconn, pi->vnr);
5080         if (!mdev)
5081                 return -EIO;
5082
5083         sector = be64_to_cpu(p->sector);
5084         size = be32_to_cpu(p->blksize);
5085
5086         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5087
5088         dec_rs_pending(mdev);
5089
5090         if (get_ldev_if_state(mdev, D_FAILED)) {
5091                 drbd_rs_complete_io(mdev, sector);
5092                 switch (pi->cmd) {
5093                 case P_NEG_RS_DREPLY:
5094                         drbd_rs_failed_io(mdev, sector, size);
5095                 case P_RS_CANCEL:
5096                         break;
5097                 default:
5098                         BUG();
5099                 }
5100                 put_ldev(mdev);
5101         }
5102
5103         return 0;
5104 }
5105
5106 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5107 {
5108         struct p_barrier_ack *p = pi->data;
5109         struct drbd_conf *mdev;
5110         int vnr;
5111
5112         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5113
5114         rcu_read_lock();
5115         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5116                 if (mdev->state.conn == C_AHEAD &&
5117                     atomic_read(&mdev->ap_in_flight) == 0 &&
5118                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5119                         mdev->start_resync_timer.expires = jiffies + HZ;
5120                         add_timer(&mdev->start_resync_timer);
5121                 }
5122         }
5123         rcu_read_unlock();
5124
5125         return 0;
5126 }
5127
5128 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5129 {
5130         struct drbd_conf *mdev;
5131         struct p_block_ack *p = pi->data;
5132         struct drbd_work *w;
5133         sector_t sector;
5134         int size;
5135
5136         mdev = vnr_to_mdev(tconn, pi->vnr);
5137         if (!mdev)
5138                 return -EIO;
5139
5140         sector = be64_to_cpu(p->sector);
5141         size = be32_to_cpu(p->blksize);
5142
5143         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5144
5145         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5146                 drbd_ov_out_of_sync_found(mdev, sector, size);
5147         else
5148                 ov_out_of_sync_print(mdev);
5149
5150         if (!get_ldev(mdev))
5151                 return 0;
5152
5153         drbd_rs_complete_io(mdev, sector);
5154         dec_rs_pending(mdev);
5155
5156         --mdev->ov_left;
5157
5158         /* let's advance progress step marks only for every other megabyte */
5159         if ((mdev->ov_left & 0x200) == 0x200)
5160                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5161
5162         if (mdev->ov_left == 0) {
5163                 w = kmalloc(sizeof(*w), GFP_NOIO);
5164                 if (w) {
5165                         w->cb = w_ov_finished;
5166                         w->mdev = mdev;
5167                         drbd_queue_work(&mdev->tconn->sender_work, w);
5168                 } else {
5169                         dev_err(DEV, "kmalloc(w) failed.");
5170                         ov_out_of_sync_print(mdev);
5171                         drbd_resync_finished(mdev);
5172                 }
5173         }
5174         put_ldev(mdev);
5175         return 0;
5176 }
5177
5178 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5179 {
5180         return 0;
5181 }
5182
5183 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5184 {
5185         struct drbd_conf *mdev;
5186         int vnr, not_empty = 0;
5187
5188         do {
5189                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5190                 flush_signals(current);
5191
5192                 rcu_read_lock();
5193                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5194                         kref_get(&mdev->kref);
5195                         rcu_read_unlock();
5196                         if (drbd_finish_peer_reqs(mdev)) {
5197                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5198                                 return 1;
5199                         }
5200                         kref_put(&mdev->kref, &drbd_minor_destroy);
5201                         rcu_read_lock();
5202                 }
5203                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5204
5205                 spin_lock_irq(&tconn->req_lock);
5206                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5207                         not_empty = !list_empty(&mdev->done_ee);
5208                         if (not_empty)
5209                                 break;
5210                 }
5211                 spin_unlock_irq(&tconn->req_lock);
5212                 rcu_read_unlock();
5213         } while (not_empty);
5214
5215         return 0;
5216 }
5217
5218 struct asender_cmd {
5219         size_t pkt_size;
5220         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5221 };
5222
5223 static struct asender_cmd asender_tbl[] = {
5224         [P_PING]            = { 0, got_Ping },
5225         [P_PING_ACK]        = { 0, got_PingAck },
5226         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5227         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5228         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5229         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5230         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5231         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5232         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5233         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5234         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5235         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5236         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5237         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5238         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5239         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5240         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5241 };
5242
5243 int drbd_asender(struct drbd_thread *thi)
5244 {
5245         struct drbd_tconn *tconn = thi->tconn;
5246         struct asender_cmd *cmd = NULL;
5247         struct packet_info pi;
5248         int rv;
5249         void *buf    = tconn->meta.rbuf;
5250         int received = 0;
5251         unsigned int header_size = drbd_header_size(tconn);
5252         int expect   = header_size;
5253         bool ping_timeout_active = false;
5254         struct net_conf *nc;
5255         int ping_timeo, tcp_cork, ping_int;
5256
5257         current->policy = SCHED_RR;  /* Make this a realtime task! */
5258         current->rt_priority = 2;    /* more important than all other tasks */
5259
5260         while (get_t_state(thi) == RUNNING) {
5261                 drbd_thread_current_set_cpu(thi);
5262
5263                 rcu_read_lock();
5264                 nc = rcu_dereference(tconn->net_conf);
5265                 ping_timeo = nc->ping_timeo;
5266                 tcp_cork = nc->tcp_cork;
5267                 ping_int = nc->ping_int;
5268                 rcu_read_unlock();
5269
5270                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5271                         if (drbd_send_ping(tconn)) {
5272                                 conn_err(tconn, "drbd_send_ping has failed\n");
5273                                 goto reconnect;
5274                         }
5275                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5276                         ping_timeout_active = true;
5277                 }
5278
5279                 /* TODO: conditionally cork; it may hurt latency if we cork without
5280                    much to send */
5281                 if (tcp_cork)
5282                         drbd_tcp_cork(tconn->meta.socket);
5283                 if (tconn_finish_peer_reqs(tconn)) {
5284                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5285                         goto reconnect;
5286                 }
5287                 /* but unconditionally uncork unless disabled */
5288                 if (tcp_cork)
5289                         drbd_tcp_uncork(tconn->meta.socket);
5290
5291                 /* short circuit, recv_msg would return EINTR anyways. */
5292                 if (signal_pending(current))
5293                         continue;
5294
5295                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5296                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5297
5298                 flush_signals(current);
5299
5300                 /* Note:
5301                  * -EINTR        (on meta) we got a signal
5302                  * -EAGAIN       (on meta) rcvtimeo expired
5303                  * -ECONNRESET   other side closed the connection
5304                  * -ERESTARTSYS  (on data) we got a signal
5305                  * rv <  0       other than above: unexpected error!
5306                  * rv == expected: full header or command
5307                  * rv <  expected: "woken" by signal during receive
5308                  * rv == 0       : "connection shut down by peer"
5309                  */
5310                 if (likely(rv > 0)) {
5311                         received += rv;
5312                         buf      += rv;
5313                 } else if (rv == 0) {
5314                         conn_err(tconn, "meta connection shut down by peer.\n");
5315                         goto reconnect;
5316                 } else if (rv == -EAGAIN) {
5317                         /* If the data socket received something meanwhile,
5318                          * that is good enough: peer is still alive. */
5319                         if (time_after(tconn->last_received,
5320                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5321                                 continue;
5322                         if (ping_timeout_active) {
5323                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5324                                 goto reconnect;
5325                         }
5326                         set_bit(SEND_PING, &tconn->flags);
5327                         continue;
5328                 } else if (rv == -EINTR) {
5329                         continue;
5330                 } else {
5331                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5332                         goto reconnect;
5333                 }
5334
5335                 if (received == expect && cmd == NULL) {
5336                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5337                                 goto reconnect;
5338                         cmd = &asender_tbl[pi.cmd];
5339                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5340                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5341                                          cmdname(pi.cmd), pi.cmd);
5342                                 goto disconnect;
5343                         }
5344                         expect = header_size + cmd->pkt_size;
5345                         if (pi.size != expect - header_size) {
5346                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5347                                         pi.cmd, pi.size);
5348                                 goto reconnect;
5349                         }
5350                 }
5351                 if (received == expect) {
5352                         bool err;
5353
5354                         err = cmd->fn(tconn, &pi);
5355                         if (err) {
5356                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5357                                 goto reconnect;
5358                         }
5359
5360                         tconn->last_received = jiffies;
5361
5362                         if (cmd == &asender_tbl[P_PING_ACK]) {
5363                                 /* restore idle timeout */
5364                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5365                                 ping_timeout_active = false;
5366                         }
5367
5368                         buf      = tconn->meta.rbuf;
5369                         received = 0;
5370                         expect   = header_size;
5371                         cmd      = NULL;
5372                 }
5373         }
5374
5375         if (0) {
5376 reconnect:
5377                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5378         }
5379         if (0) {
5380 disconnect:
5381                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5382         }
5383         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5384
5385         conn_info(tconn, "asender terminated\n");
5386
5387         return 0;
5388 }