drbd: Runtime changeable wire protocol
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         DEFINE_WAIT(wait);
248
249         /* Yes, we may run up to @number over max_buffers. If we
250          * follow it strictly, the admin will get it wrong anyways. */
251         if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
252                 page = __drbd_alloc_pages(mdev, number);
253
254         while (page == NULL) {
255                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
256
257                 drbd_kick_lo_and_reclaim_net(mdev);
258
259                 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
260                         page = __drbd_alloc_pages(mdev, number);
261                         if (page)
262                                 break;
263                 }
264
265                 if (!retry)
266                         break;
267
268                 if (signal_pending(current)) {
269                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
270                         break;
271                 }
272
273                 schedule();
274         }
275         finish_wait(&drbd_pp_wait, &wait);
276
277         if (page)
278                 atomic_add(number, &mdev->pp_in_use);
279         return page;
280 }
281
282 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
283  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
284  * Either links the page chain back to the global pool,
285  * or returns all pages to the system. */
286 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
287 {
288         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
289         int i;
290
291         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
292                 i = page_chain_free(page);
293         else {
294                 struct page *tmp;
295                 tmp = page_chain_tail(page, &i);
296                 spin_lock(&drbd_pp_lock);
297                 page_chain_add(&drbd_pp_pool, page, tmp);
298                 drbd_pp_vacant += i;
299                 spin_unlock(&drbd_pp_lock);
300         }
301         i = atomic_sub_return(i, a);
302         if (i < 0)
303                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
304                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
305         wake_up(&drbd_pp_wait);
306 }
307
308 /*
309 You need to hold the req_lock:
310  _drbd_wait_ee_list_empty()
311
312 You must not have the req_lock:
313  drbd_free_peer_req()
314  drbd_alloc_peer_req()
315  drbd_free_peer_reqs()
316  drbd_ee_fix_bhs()
317  drbd_finish_peer_reqs()
318  drbd_clear_done_ee()
319  drbd_wait_ee_list_empty()
320 */
321
322 struct drbd_peer_request *
323 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
324                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
325 {
326         struct drbd_peer_request *peer_req;
327         struct page *page;
328         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
329
330         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
331                 return NULL;
332
333         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
334         if (!peer_req) {
335                 if (!(gfp_mask & __GFP_NOWARN))
336                         dev_err(DEV, "%s: allocation failed\n", __func__);
337                 return NULL;
338         }
339
340         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
341         if (!page)
342                 goto fail;
343
344         drbd_clear_interval(&peer_req->i);
345         peer_req->i.size = data_size;
346         peer_req->i.sector = sector;
347         peer_req->i.local = false;
348         peer_req->i.waiting = false;
349
350         peer_req->epoch = NULL;
351         peer_req->w.mdev = mdev;
352         peer_req->pages = page;
353         atomic_set(&peer_req->pending_bios, 0);
354         peer_req->flags = 0;
355         /*
356          * The block_id is opaque to the receiver.  It is not endianness
357          * converted, and sent back to the sender unchanged.
358          */
359         peer_req->block_id = id;
360
361         return peer_req;
362
363  fail:
364         mempool_free(peer_req, drbd_ee_mempool);
365         return NULL;
366 }
367
368 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
369                        int is_net)
370 {
371         if (peer_req->flags & EE_HAS_DIGEST)
372                 kfree(peer_req->digest);
373         drbd_free_pages(mdev, peer_req->pages, is_net);
374         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
375         D_ASSERT(drbd_interval_empty(&peer_req->i));
376         mempool_free(peer_req, drbd_ee_mempool);
377 }
378
379 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
380 {
381         LIST_HEAD(work_list);
382         struct drbd_peer_request *peer_req, *t;
383         int count = 0;
384         int is_net = list == &mdev->net_ee;
385
386         spin_lock_irq(&mdev->tconn->req_lock);
387         list_splice_init(list, &work_list);
388         spin_unlock_irq(&mdev->tconn->req_lock);
389
390         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
391                 __drbd_free_peer_req(mdev, peer_req, is_net);
392                 count++;
393         }
394         return count;
395 }
396
397 /*
398  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
399  */
400 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
401 {
402         LIST_HEAD(work_list);
403         LIST_HEAD(reclaimed);
404         struct drbd_peer_request *peer_req, *t;
405         int err = 0;
406
407         spin_lock_irq(&mdev->tconn->req_lock);
408         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
409         list_splice_init(&mdev->done_ee, &work_list);
410         spin_unlock_irq(&mdev->tconn->req_lock);
411
412         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
413                 drbd_free_net_peer_req(mdev, peer_req);
414
415         /* possible callbacks here:
416          * e_end_block, and e_end_resync_block, e_send_discard_write.
417          * all ignore the last argument.
418          */
419         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
420                 int err2;
421
422                 /* list_del not necessary, next/prev members not touched */
423                 err2 = peer_req->w.cb(&peer_req->w, !!err);
424                 if (!err)
425                         err = err2;
426                 drbd_free_peer_req(mdev, peer_req);
427         }
428         wake_up(&mdev->ee_wait);
429
430         return err;
431 }
432
433 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
434                                      struct list_head *head)
435 {
436         DEFINE_WAIT(wait);
437
438         /* avoids spin_lock/unlock
439          * and calling prepare_to_wait in the fast path */
440         while (!list_empty(head)) {
441                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
442                 spin_unlock_irq(&mdev->tconn->req_lock);
443                 io_schedule();
444                 finish_wait(&mdev->ee_wait, &wait);
445                 spin_lock_irq(&mdev->tconn->req_lock);
446         }
447 }
448
449 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
450                                     struct list_head *head)
451 {
452         spin_lock_irq(&mdev->tconn->req_lock);
453         _drbd_wait_ee_list_empty(mdev, head);
454         spin_unlock_irq(&mdev->tconn->req_lock);
455 }
456
457 /* see also kernel_accept; which is only present since 2.6.18.
458  * also we want to log which part of it failed, exactly */
459 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
460 {
461         struct sock *sk = sock->sk;
462         int err = 0;
463
464         *what = "listen";
465         err = sock->ops->listen(sock, 5);
466         if (err < 0)
467                 goto out;
468
469         *what = "sock_create_lite";
470         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
471                                newsock);
472         if (err < 0)
473                 goto out;
474
475         *what = "accept";
476         err = sock->ops->accept(sock, *newsock, 0);
477         if (err < 0) {
478                 sock_release(*newsock);
479                 *newsock = NULL;
480                 goto out;
481         }
482         (*newsock)->ops  = sock->ops;
483
484 out:
485         return err;
486 }
487
488 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
489 {
490         mm_segment_t oldfs;
491         struct kvec iov = {
492                 .iov_base = buf,
493                 .iov_len = size,
494         };
495         struct msghdr msg = {
496                 .msg_iovlen = 1,
497                 .msg_iov = (struct iovec *)&iov,
498                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
499         };
500         int rv;
501
502         oldfs = get_fs();
503         set_fs(KERNEL_DS);
504         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
505         set_fs(oldfs);
506
507         return rv;
508 }
509
510 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
511 {
512         mm_segment_t oldfs;
513         struct kvec iov = {
514                 .iov_base = buf,
515                 .iov_len = size,
516         };
517         struct msghdr msg = {
518                 .msg_iovlen = 1,
519                 .msg_iov = (struct iovec *)&iov,
520                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
521         };
522         int rv;
523
524         oldfs = get_fs();
525         set_fs(KERNEL_DS);
526
527         for (;;) {
528                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
529                 if (rv == size)
530                         break;
531
532                 /* Note:
533                  * ECONNRESET   other side closed the connection
534                  * ERESTARTSYS  (on  sock) we got a signal
535                  */
536
537                 if (rv < 0) {
538                         if (rv == -ECONNRESET)
539                                 conn_info(tconn, "sock was reset by peer\n");
540                         else if (rv != -ERESTARTSYS)
541                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
542                         break;
543                 } else if (rv == 0) {
544                         conn_info(tconn, "sock was shut down by peer\n");
545                         break;
546                 } else  {
547                         /* signal came in, or peer/link went down,
548                          * after we read a partial message
549                          */
550                         /* D_ASSERT(signal_pending(current)); */
551                         break;
552                 }
553         };
554
555         set_fs(oldfs);
556
557         if (rv != size)
558                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
559
560         return rv;
561 }
562
563 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
564 {
565         int err;
566
567         err = drbd_recv(tconn, buf, size);
568         if (err != size) {
569                 if (err >= 0)
570                         err = -EIO;
571         } else
572                 err = 0;
573         return err;
574 }
575
576 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
577 {
578         int err;
579
580         err = drbd_recv_all(tconn, buf, size);
581         if (err && !signal_pending(current))
582                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
583         return err;
584 }
585
586 /* quoting tcp(7):
587  *   On individual connections, the socket buffer size must be set prior to the
588  *   listen(2) or connect(2) calls in order to have it take effect.
589  * This is our wrapper to do so.
590  */
591 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
592                 unsigned int rcv)
593 {
594         /* open coded SO_SNDBUF, SO_RCVBUF */
595         if (snd) {
596                 sock->sk->sk_sndbuf = snd;
597                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
598         }
599         if (rcv) {
600                 sock->sk->sk_rcvbuf = rcv;
601                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
602         }
603 }
604
605 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
606 {
607         const char *what;
608         struct socket *sock;
609         struct sockaddr_in6 src_in6;
610         int err;
611         int disconnect_on_error = 1;
612
613         if (!get_net_conf(tconn))
614                 return NULL;
615
616         what = "sock_create_kern";
617         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
618                 SOCK_STREAM, IPPROTO_TCP, &sock);
619         if (err < 0) {
620                 sock = NULL;
621                 goto out;
622         }
623
624         sock->sk->sk_rcvtimeo =
625         sock->sk->sk_sndtimeo =  tconn->net_conf->try_connect_int*HZ;
626         drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
627                         tconn->net_conf->rcvbuf_size);
628
629        /* explicitly bind to the configured IP as source IP
630         *  for the outgoing connections.
631         *  This is needed for multihomed hosts and to be
632         *  able to use lo: interfaces for drbd.
633         * Make sure to use 0 as port number, so linux selects
634         *  a free one dynamically.
635         */
636         memcpy(&src_in6, tconn->net_conf->my_addr,
637                min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
638         if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
639                 src_in6.sin6_port = 0;
640         else
641                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642
643         what = "bind before connect";
644         err = sock->ops->bind(sock,
645                               (struct sockaddr *) &src_in6,
646                               tconn->net_conf->my_addr_len);
647         if (err < 0)
648                 goto out;
649
650         /* connect may fail, peer not yet available.
651          * stay C_WF_CONNECTION, don't go Disconnecting! */
652         disconnect_on_error = 0;
653         what = "connect";
654         err = sock->ops->connect(sock,
655                                  (struct sockaddr *)tconn->net_conf->peer_addr,
656                                  tconn->net_conf->peer_addr_len, 0);
657
658 out:
659         if (err < 0) {
660                 if (sock) {
661                         sock_release(sock);
662                         sock = NULL;
663                 }
664                 switch (-err) {
665                         /* timeout, busy, signal pending */
666                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667                 case EINTR: case ERESTARTSYS:
668                         /* peer not (yet) available, network problem */
669                 case ECONNREFUSED: case ENETUNREACH:
670                 case EHOSTDOWN:    case EHOSTUNREACH:
671                         disconnect_on_error = 0;
672                         break;
673                 default:
674                         conn_err(tconn, "%s failed, err = %d\n", what, err);
675                 }
676                 if (disconnect_on_error)
677                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
678         }
679         put_net_conf(tconn);
680         return sock;
681 }
682
683 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
684 {
685         int timeo, err;
686         struct socket *s_estab = NULL, *s_listen;
687         const char *what;
688
689         if (!get_net_conf(tconn))
690                 return NULL;
691
692         what = "sock_create_kern";
693         err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
694                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
695         if (err) {
696                 s_listen = NULL;
697                 goto out;
698         }
699
700         timeo = tconn->net_conf->try_connect_int * HZ;
701         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
702
703         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
704         s_listen->sk->sk_rcvtimeo = timeo;
705         s_listen->sk->sk_sndtimeo = timeo;
706         drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
707                         tconn->net_conf->rcvbuf_size);
708
709         what = "bind before listen";
710         err = s_listen->ops->bind(s_listen,
711                               (struct sockaddr *) tconn->net_conf->my_addr,
712                               tconn->net_conf->my_addr_len);
713         if (err < 0)
714                 goto out;
715
716         err = drbd_accept(&what, s_listen, &s_estab);
717
718 out:
719         if (s_listen)
720                 sock_release(s_listen);
721         if (err < 0) {
722                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
723                         conn_err(tconn, "%s failed, err = %d\n", what, err);
724                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
725                 }
726         }
727         put_net_conf(tconn);
728
729         return s_estab;
730 }
731
732 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
733
734 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
735                              enum drbd_packet cmd)
736 {
737         if (!conn_prepare_command(tconn, sock))
738                 return -EIO;
739         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
740 }
741
742 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
743 {
744         unsigned int header_size = drbd_header_size(tconn);
745         struct packet_info pi;
746         int err;
747
748         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
749         if (err != header_size) {
750                 if (err >= 0)
751                         err = -EIO;
752                 return err;
753         }
754         err = decode_header(tconn, tconn->data.rbuf, &pi);
755         if (err)
756                 return err;
757         return pi.cmd;
758 }
759
760 /**
761  * drbd_socket_okay() - Free the socket if its connection is not okay
762  * @sock:       pointer to the pointer to the socket.
763  */
764 static int drbd_socket_okay(struct socket **sock)
765 {
766         int rr;
767         char tb[4];
768
769         if (!*sock)
770                 return false;
771
772         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
773
774         if (rr > 0 || rr == -EAGAIN) {
775                 return true;
776         } else {
777                 sock_release(*sock);
778                 *sock = NULL;
779                 return false;
780         }
781 }
782 /* Gets called if a connection is established, or if a new minor gets created
783    in a connection */
784 int drbd_connected(int vnr, void *p, void *data)
785 {
786         struct drbd_conf *mdev = (struct drbd_conf *)p;
787         int err;
788
789         atomic_set(&mdev->packet_seq, 0);
790         mdev->peer_seq = 0;
791
792         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
793                 &mdev->tconn->cstate_mutex :
794                 &mdev->own_state_mutex;
795
796         err = drbd_send_sync_param(mdev);
797         if (!err)
798                 err = drbd_send_sizes(mdev, 0, 0);
799         if (!err)
800                 err = drbd_send_uuids(mdev);
801         if (!err)
802                 err = drbd_send_state(mdev);
803         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
804         clear_bit(RESIZE_PENDING, &mdev->flags);
805         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
806         return err;
807 }
808
809 /*
810  * return values:
811  *   1 yes, we have a valid connection
812  *   0 oops, did not work out, please try again
813  *  -1 peer talks different language,
814  *     no point in trying again, please go standalone.
815  *  -2 We do not have a network config...
816  */
817 static int drbd_connect(struct drbd_tconn *tconn)
818 {
819         struct socket *sock, *msock;
820         int try, h, ok;
821
822         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
823                 return -2;
824
825         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
826
827         /* Assume that the peer only understands protocol 80 until we know better.  */
828         tconn->agreed_pro_version = 80;
829
830         do {
831                 struct socket *s;
832
833                 for (try = 0;;) {
834                         /* 3 tries, this should take less than a second! */
835                         s = drbd_try_connect(tconn);
836                         if (s || ++try >= 3)
837                                 break;
838                         /* give the other side time to call bind() & listen() */
839                         schedule_timeout_interruptible(HZ / 10);
840                 }
841
842                 if (s) {
843                         if (!tconn->data.socket) {
844                                 tconn->data.socket = s;
845                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
846                         } else if (!tconn->meta.socket) {
847                                 tconn->meta.socket = s;
848                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
849                         } else {
850                                 conn_err(tconn, "Logic error in drbd_connect()\n");
851                                 goto out_release_sockets;
852                         }
853                 }
854
855                 if (tconn->data.socket && tconn->meta.socket) {
856                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
857                         ok = drbd_socket_okay(&tconn->data.socket);
858                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
859                         if (ok)
860                                 break;
861                 }
862
863 retry:
864                 s = drbd_wait_for_connect(tconn);
865                 if (s) {
866                         try = receive_first_packet(tconn, s);
867                         drbd_socket_okay(&tconn->data.socket);
868                         drbd_socket_okay(&tconn->meta.socket);
869                         switch (try) {
870                         case P_INITIAL_DATA:
871                                 if (tconn->data.socket) {
872                                         conn_warn(tconn, "initial packet S crossed\n");
873                                         sock_release(tconn->data.socket);
874                                 }
875                                 tconn->data.socket = s;
876                                 break;
877                         case P_INITIAL_META:
878                                 if (tconn->meta.socket) {
879                                         conn_warn(tconn, "initial packet M crossed\n");
880                                         sock_release(tconn->meta.socket);
881                                 }
882                                 tconn->meta.socket = s;
883                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
884                                 break;
885                         default:
886                                 conn_warn(tconn, "Error receiving initial packet\n");
887                                 sock_release(s);
888                                 if (random32() & 1)
889                                         goto retry;
890                         }
891                 }
892
893                 if (tconn->cstate <= C_DISCONNECTING)
894                         goto out_release_sockets;
895                 if (signal_pending(current)) {
896                         flush_signals(current);
897                         smp_rmb();
898                         if (get_t_state(&tconn->receiver) == EXITING)
899                                 goto out_release_sockets;
900                 }
901
902                 if (tconn->data.socket && &tconn->meta.socket) {
903                         ok = drbd_socket_okay(&tconn->data.socket);
904                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
905                         if (ok)
906                                 break;
907                 }
908         } while (1);
909
910         sock  = tconn->data.socket;
911         msock = tconn->meta.socket;
912
913         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
914         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
915
916         sock->sk->sk_allocation = GFP_NOIO;
917         msock->sk->sk_allocation = GFP_NOIO;
918
919         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
920         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
921
922         /* NOT YET ...
923          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
924          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
925          * first set it to the P_CONNECTION_FEATURES timeout,
926          * which we set to 4x the configured ping_timeout. */
927         sock->sk->sk_sndtimeo =
928         sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
929
930         msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
931         msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
932
933         /* we don't want delays.
934          * we use TCP_CORK where appropriate, though */
935         drbd_tcp_nodelay(sock);
936         drbd_tcp_nodelay(msock);
937
938         tconn->last_received = jiffies;
939
940         h = drbd_do_features(tconn);
941         if (h <= 0)
942                 return h;
943
944         if (tconn->cram_hmac_tfm) {
945                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
946                 switch (drbd_do_auth(tconn)) {
947                 case -1:
948                         conn_err(tconn, "Authentication of peer failed\n");
949                         return -1;
950                 case 0:
951                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
952                         return 0;
953                 }
954         }
955
956         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
957                 return 0;
958
959         sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
960         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
961
962         drbd_thread_start(&tconn->asender);
963
964         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
965                 return -1;
966
967         down_read(&drbd_cfg_rwsem);
968         h = !idr_for_each(&tconn->volumes, drbd_connected, tconn);
969         up_read(&drbd_cfg_rwsem);
970         return h;
971
972 out_release_sockets:
973         if (tconn->data.socket) {
974                 sock_release(tconn->data.socket);
975                 tconn->data.socket = NULL;
976         }
977         if (tconn->meta.socket) {
978                 sock_release(tconn->meta.socket);
979                 tconn->meta.socket = NULL;
980         }
981         return -1;
982 }
983
984 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
985 {
986         unsigned int header_size = drbd_header_size(tconn);
987
988         if (header_size == sizeof(struct p_header100) &&
989             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
990                 struct p_header100 *h = header;
991                 if (h->pad != 0) {
992                         conn_err(tconn, "Header padding is not zero\n");
993                         return -EINVAL;
994                 }
995                 pi->vnr = be16_to_cpu(h->volume);
996                 pi->cmd = be16_to_cpu(h->command);
997                 pi->size = be32_to_cpu(h->length);
998         } else if (header_size == sizeof(struct p_header95) &&
999                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1000                 struct p_header95 *h = header;
1001                 pi->cmd = be16_to_cpu(h->command);
1002                 pi->size = be32_to_cpu(h->length);
1003                 pi->vnr = 0;
1004         } else if (header_size == sizeof(struct p_header80) &&
1005                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1006                 struct p_header80 *h = header;
1007                 pi->cmd = be16_to_cpu(h->command);
1008                 pi->size = be16_to_cpu(h->length);
1009                 pi->vnr = 0;
1010         } else {
1011                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1012                          be32_to_cpu(*(__be32 *)header),
1013                          tconn->agreed_pro_version);
1014                 return -EINVAL;
1015         }
1016         pi->data = header + header_size;
1017         return 0;
1018 }
1019
1020 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1021 {
1022         void *buffer = tconn->data.rbuf;
1023         int err;
1024
1025         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1026         if (err)
1027                 return err;
1028
1029         err = decode_header(tconn, buffer, pi);
1030         tconn->last_received = jiffies;
1031
1032         return err;
1033 }
1034
1035 static void drbd_flush(struct drbd_conf *mdev)
1036 {
1037         int rv;
1038
1039         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1040                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1041                                         NULL);
1042                 if (rv) {
1043                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1044                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1045                          * don't try again for ANY return value != 0
1046                          * if (rv == -EOPNOTSUPP) */
1047                         drbd_bump_write_ordering(mdev, WO_drain_io);
1048                 }
1049                 put_ldev(mdev);
1050         }
1051 }
1052
1053 /**
1054  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1055  * @mdev:       DRBD device.
1056  * @epoch:      Epoch object.
1057  * @ev:         Epoch event.
1058  */
1059 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1060                                                struct drbd_epoch *epoch,
1061                                                enum epoch_event ev)
1062 {
1063         int epoch_size;
1064         struct drbd_epoch *next_epoch;
1065         enum finish_epoch rv = FE_STILL_LIVE;
1066
1067         spin_lock(&mdev->epoch_lock);
1068         do {
1069                 next_epoch = NULL;
1070
1071                 epoch_size = atomic_read(&epoch->epoch_size);
1072
1073                 switch (ev & ~EV_CLEANUP) {
1074                 case EV_PUT:
1075                         atomic_dec(&epoch->active);
1076                         break;
1077                 case EV_GOT_BARRIER_NR:
1078                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1079                         break;
1080                 case EV_BECAME_LAST:
1081                         /* nothing to do*/
1082                         break;
1083                 }
1084
1085                 if (epoch_size != 0 &&
1086                     atomic_read(&epoch->active) == 0 &&
1087                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1088                         if (!(ev & EV_CLEANUP)) {
1089                                 spin_unlock(&mdev->epoch_lock);
1090                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1091                                 spin_lock(&mdev->epoch_lock);
1092                         }
1093                         dec_unacked(mdev);
1094
1095                         if (mdev->current_epoch != epoch) {
1096                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1097                                 list_del(&epoch->list);
1098                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1099                                 mdev->epochs--;
1100                                 kfree(epoch);
1101
1102                                 if (rv == FE_STILL_LIVE)
1103                                         rv = FE_DESTROYED;
1104                         } else {
1105                                 epoch->flags = 0;
1106                                 atomic_set(&epoch->epoch_size, 0);
1107                                 /* atomic_set(&epoch->active, 0); is already zero */
1108                                 if (rv == FE_STILL_LIVE)
1109                                         rv = FE_RECYCLED;
1110                                 wake_up(&mdev->ee_wait);
1111                         }
1112                 }
1113
1114                 if (!next_epoch)
1115                         break;
1116
1117                 epoch = next_epoch;
1118         } while (1);
1119
1120         spin_unlock(&mdev->epoch_lock);
1121
1122         return rv;
1123 }
1124
1125 /**
1126  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1127  * @mdev:       DRBD device.
1128  * @wo:         Write ordering method to try.
1129  */
1130 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1131 {
1132         enum write_ordering_e pwo;
1133         static char *write_ordering_str[] = {
1134                 [WO_none] = "none",
1135                 [WO_drain_io] = "drain",
1136                 [WO_bdev_flush] = "flush",
1137         };
1138
1139         pwo = mdev->write_ordering;
1140         wo = min(pwo, wo);
1141         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1142                 wo = WO_drain_io;
1143         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1144                 wo = WO_none;
1145         mdev->write_ordering = wo;
1146         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1147                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1148 }
1149
1150 /**
1151  * drbd_submit_peer_request()
1152  * @mdev:       DRBD device.
1153  * @peer_req:   peer request
1154  * @rw:         flag field, see bio->bi_rw
1155  *
1156  * May spread the pages to multiple bios,
1157  * depending on bio_add_page restrictions.
1158  *
1159  * Returns 0 if all bios have been submitted,
1160  * -ENOMEM if we could not allocate enough bios,
1161  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1162  *  single page to an empty bio (which should never happen and likely indicates
1163  *  that the lower level IO stack is in some way broken). This has been observed
1164  *  on certain Xen deployments.
1165  */
1166 /* TODO allocate from our own bio_set. */
1167 int drbd_submit_peer_request(struct drbd_conf *mdev,
1168                              struct drbd_peer_request *peer_req,
1169                              const unsigned rw, const int fault_type)
1170 {
1171         struct bio *bios = NULL;
1172         struct bio *bio;
1173         struct page *page = peer_req->pages;
1174         sector_t sector = peer_req->i.sector;
1175         unsigned ds = peer_req->i.size;
1176         unsigned n_bios = 0;
1177         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1178         int err = -ENOMEM;
1179
1180         /* In most cases, we will only need one bio.  But in case the lower
1181          * level restrictions happen to be different at this offset on this
1182          * side than those of the sending peer, we may need to submit the
1183          * request in more than one bio.
1184          *
1185          * Plain bio_alloc is good enough here, this is no DRBD internally
1186          * generated bio, but a bio allocated on behalf of the peer.
1187          */
1188 next_bio:
1189         bio = bio_alloc(GFP_NOIO, nr_pages);
1190         if (!bio) {
1191                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1192                 goto fail;
1193         }
1194         /* > peer_req->i.sector, unless this is the first bio */
1195         bio->bi_sector = sector;
1196         bio->bi_bdev = mdev->ldev->backing_bdev;
1197         bio->bi_rw = rw;
1198         bio->bi_private = peer_req;
1199         bio->bi_end_io = drbd_peer_request_endio;
1200
1201         bio->bi_next = bios;
1202         bios = bio;
1203         ++n_bios;
1204
1205         page_chain_for_each(page) {
1206                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1207                 if (!bio_add_page(bio, page, len, 0)) {
1208                         /* A single page must always be possible!
1209                          * But in case it fails anyways,
1210                          * we deal with it, and complain (below). */
1211                         if (bio->bi_vcnt == 0) {
1212                                 dev_err(DEV,
1213                                         "bio_add_page failed for len=%u, "
1214                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1215                                         len, (unsigned long long)bio->bi_sector);
1216                                 err = -ENOSPC;
1217                                 goto fail;
1218                         }
1219                         goto next_bio;
1220                 }
1221                 ds -= len;
1222                 sector += len >> 9;
1223                 --nr_pages;
1224         }
1225         D_ASSERT(page == NULL);
1226         D_ASSERT(ds == 0);
1227
1228         atomic_set(&peer_req->pending_bios, n_bios);
1229         do {
1230                 bio = bios;
1231                 bios = bios->bi_next;
1232                 bio->bi_next = NULL;
1233
1234                 drbd_generic_make_request(mdev, fault_type, bio);
1235         } while (bios);
1236         return 0;
1237
1238 fail:
1239         while (bios) {
1240                 bio = bios;
1241                 bios = bios->bi_next;
1242                 bio_put(bio);
1243         }
1244         return err;
1245 }
1246
1247 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1248                                              struct drbd_peer_request *peer_req)
1249 {
1250         struct drbd_interval *i = &peer_req->i;
1251
1252         drbd_remove_interval(&mdev->write_requests, i);
1253         drbd_clear_interval(i);
1254
1255         /* Wake up any processes waiting for this peer request to complete.  */
1256         if (i->waiting)
1257                 wake_up(&mdev->misc_wait);
1258 }
1259
1260 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1261 {
1262         struct drbd_conf *mdev;
1263         int rv;
1264         struct p_barrier *p = pi->data;
1265         struct drbd_epoch *epoch;
1266
1267         mdev = vnr_to_mdev(tconn, pi->vnr);
1268         if (!mdev)
1269                 return -EIO;
1270
1271         inc_unacked(mdev);
1272
1273         mdev->current_epoch->barrier_nr = p->barrier;
1274         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1275
1276         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1277          * the activity log, which means it would not be resynced in case the
1278          * R_PRIMARY crashes now.
1279          * Therefore we must send the barrier_ack after the barrier request was
1280          * completed. */
1281         switch (mdev->write_ordering) {
1282         case WO_none:
1283                 if (rv == FE_RECYCLED)
1284                         return 0;
1285
1286                 /* receiver context, in the writeout path of the other node.
1287                  * avoid potential distributed deadlock */
1288                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1289                 if (epoch)
1290                         break;
1291                 else
1292                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1293                         /* Fall through */
1294
1295         case WO_bdev_flush:
1296         case WO_drain_io:
1297                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1298                 drbd_flush(mdev);
1299
1300                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1301                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1302                         if (epoch)
1303                                 break;
1304                 }
1305
1306                 epoch = mdev->current_epoch;
1307                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1308
1309                 D_ASSERT(atomic_read(&epoch->active) == 0);
1310                 D_ASSERT(epoch->flags == 0);
1311
1312                 return 0;
1313         default:
1314                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1315                 return -EIO;
1316         }
1317
1318         epoch->flags = 0;
1319         atomic_set(&epoch->epoch_size, 0);
1320         atomic_set(&epoch->active, 0);
1321
1322         spin_lock(&mdev->epoch_lock);
1323         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1324                 list_add(&epoch->list, &mdev->current_epoch->list);
1325                 mdev->current_epoch = epoch;
1326                 mdev->epochs++;
1327         } else {
1328                 /* The current_epoch got recycled while we allocated this one... */
1329                 kfree(epoch);
1330         }
1331         spin_unlock(&mdev->epoch_lock);
1332
1333         return 0;
1334 }
1335
1336 /* used from receive_RSDataReply (recv_resync_read)
1337  * and from receive_Data */
1338 static struct drbd_peer_request *
1339 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1340               int data_size) __must_hold(local)
1341 {
1342         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1343         struct drbd_peer_request *peer_req;
1344         struct page *page;
1345         int dgs, ds, err;
1346         void *dig_in = mdev->tconn->int_dig_in;
1347         void *dig_vv = mdev->tconn->int_dig_vv;
1348         unsigned long *data;
1349
1350         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1351                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1352
1353         if (dgs) {
1354                 /*
1355                  * FIXME: Receive the incoming digest into the receive buffer
1356                  *        here, together with its struct p_data?
1357                  */
1358                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1359                 if (err)
1360                         return NULL;
1361         }
1362
1363         data_size -= dgs;
1364
1365         if (!expect(data_size != 0))
1366                 return NULL;
1367         if (!expect(IS_ALIGNED(data_size, 512)))
1368                 return NULL;
1369         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1370                 return NULL;
1371
1372         /* even though we trust out peer,
1373          * we sometimes have to double check. */
1374         if (sector + (data_size>>9) > capacity) {
1375                 dev_err(DEV, "request from peer beyond end of local disk: "
1376                         "capacity: %llus < sector: %llus + size: %u\n",
1377                         (unsigned long long)capacity,
1378                         (unsigned long long)sector, data_size);
1379                 return NULL;
1380         }
1381
1382         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1383          * "criss-cross" setup, that might cause write-out on some other DRBD,
1384          * which in turn might block on the other node at this very place.  */
1385         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1386         if (!peer_req)
1387                 return NULL;
1388
1389         ds = data_size;
1390         page = peer_req->pages;
1391         page_chain_for_each(page) {
1392                 unsigned len = min_t(int, ds, PAGE_SIZE);
1393                 data = kmap(page);
1394                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1395                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1396                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1397                         data[0] = data[0] ^ (unsigned long)-1;
1398                 }
1399                 kunmap(page);
1400                 if (err) {
1401                         drbd_free_peer_req(mdev, peer_req);
1402                         return NULL;
1403                 }
1404                 ds -= len;
1405         }
1406
1407         if (dgs) {
1408                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1409                 if (memcmp(dig_in, dig_vv, dgs)) {
1410                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1411                                 (unsigned long long)sector, data_size);
1412                         drbd_free_peer_req(mdev, peer_req);
1413                         return NULL;
1414                 }
1415         }
1416         mdev->recv_cnt += data_size>>9;
1417         return peer_req;
1418 }
1419
1420 /* drbd_drain_block() just takes a data block
1421  * out of the socket input buffer, and discards it.
1422  */
1423 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1424 {
1425         struct page *page;
1426         int err = 0;
1427         void *data;
1428
1429         if (!data_size)
1430                 return 0;
1431
1432         page = drbd_alloc_pages(mdev, 1, 1);
1433
1434         data = kmap(page);
1435         while (data_size) {
1436                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1437
1438                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1439                 if (err)
1440                         break;
1441                 data_size -= len;
1442         }
1443         kunmap(page);
1444         drbd_free_pages(mdev, page, 0);
1445         return err;
1446 }
1447
1448 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1449                            sector_t sector, int data_size)
1450 {
1451         struct bio_vec *bvec;
1452         struct bio *bio;
1453         int dgs, err, i, expect;
1454         void *dig_in = mdev->tconn->int_dig_in;
1455         void *dig_vv = mdev->tconn->int_dig_vv;
1456
1457         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1458                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1459
1460         if (dgs) {
1461                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1462                 if (err)
1463                         return err;
1464         }
1465
1466         data_size -= dgs;
1467
1468         /* optimistically update recv_cnt.  if receiving fails below,
1469          * we disconnect anyways, and counters will be reset. */
1470         mdev->recv_cnt += data_size>>9;
1471
1472         bio = req->master_bio;
1473         D_ASSERT(sector == bio->bi_sector);
1474
1475         bio_for_each_segment(bvec, bio, i) {
1476                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1477                 expect = min_t(int, data_size, bvec->bv_len);
1478                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1479                 kunmap(bvec->bv_page);
1480                 if (err)
1481                         return err;
1482                 data_size -= expect;
1483         }
1484
1485         if (dgs) {
1486                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1487                 if (memcmp(dig_in, dig_vv, dgs)) {
1488                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1489                         return -EINVAL;
1490                 }
1491         }
1492
1493         D_ASSERT(data_size == 0);
1494         return 0;
1495 }
1496
1497 /*
1498  * e_end_resync_block() is called in asender context via
1499  * drbd_finish_peer_reqs().
1500  */
1501 static int e_end_resync_block(struct drbd_work *w, int unused)
1502 {
1503         struct drbd_peer_request *peer_req =
1504                 container_of(w, struct drbd_peer_request, w);
1505         struct drbd_conf *mdev = w->mdev;
1506         sector_t sector = peer_req->i.sector;
1507         int err;
1508
1509         D_ASSERT(drbd_interval_empty(&peer_req->i));
1510
1511         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1512                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1513                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1514         } else {
1515                 /* Record failure to sync */
1516                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1517
1518                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1519         }
1520         dec_unacked(mdev);
1521
1522         return err;
1523 }
1524
1525 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1526 {
1527         struct drbd_peer_request *peer_req;
1528
1529         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1530         if (!peer_req)
1531                 goto fail;
1532
1533         dec_rs_pending(mdev);
1534
1535         inc_unacked(mdev);
1536         /* corresponding dec_unacked() in e_end_resync_block()
1537          * respective _drbd_clear_done_ee */
1538
1539         peer_req->w.cb = e_end_resync_block;
1540
1541         spin_lock_irq(&mdev->tconn->req_lock);
1542         list_add(&peer_req->w.list, &mdev->sync_ee);
1543         spin_unlock_irq(&mdev->tconn->req_lock);
1544
1545         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1546         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1547                 return 0;
1548
1549         /* don't care for the reason here */
1550         dev_err(DEV, "submit failed, triggering re-connect\n");
1551         spin_lock_irq(&mdev->tconn->req_lock);
1552         list_del(&peer_req->w.list);
1553         spin_unlock_irq(&mdev->tconn->req_lock);
1554
1555         drbd_free_peer_req(mdev, peer_req);
1556 fail:
1557         put_ldev(mdev);
1558         return -EIO;
1559 }
1560
1561 static struct drbd_request *
1562 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1563              sector_t sector, bool missing_ok, const char *func)
1564 {
1565         struct drbd_request *req;
1566
1567         /* Request object according to our peer */
1568         req = (struct drbd_request *)(unsigned long)id;
1569         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1570                 return req;
1571         if (!missing_ok) {
1572                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1573                         (unsigned long)id, (unsigned long long)sector);
1574         }
1575         return NULL;
1576 }
1577
1578 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1579 {
1580         struct drbd_conf *mdev;
1581         struct drbd_request *req;
1582         sector_t sector;
1583         int err;
1584         struct p_data *p = pi->data;
1585
1586         mdev = vnr_to_mdev(tconn, pi->vnr);
1587         if (!mdev)
1588                 return -EIO;
1589
1590         sector = be64_to_cpu(p->sector);
1591
1592         spin_lock_irq(&mdev->tconn->req_lock);
1593         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1594         spin_unlock_irq(&mdev->tconn->req_lock);
1595         if (unlikely(!req))
1596                 return -EIO;
1597
1598         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1599          * special casing it there for the various failure cases.
1600          * still no race with drbd_fail_pending_reads */
1601         err = recv_dless_read(mdev, req, sector, pi->size);
1602         if (!err)
1603                 req_mod(req, DATA_RECEIVED);
1604         /* else: nothing. handled from drbd_disconnect...
1605          * I don't think we may complete this just yet
1606          * in case we are "on-disconnect: freeze" */
1607
1608         return err;
1609 }
1610
1611 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1612 {
1613         struct drbd_conf *mdev;
1614         sector_t sector;
1615         int err;
1616         struct p_data *p = pi->data;
1617
1618         mdev = vnr_to_mdev(tconn, pi->vnr);
1619         if (!mdev)
1620                 return -EIO;
1621
1622         sector = be64_to_cpu(p->sector);
1623         D_ASSERT(p->block_id == ID_SYNCER);
1624
1625         if (get_ldev(mdev)) {
1626                 /* data is submitted to disk within recv_resync_read.
1627                  * corresponding put_ldev done below on error,
1628                  * or in drbd_peer_request_endio. */
1629                 err = recv_resync_read(mdev, sector, pi->size);
1630         } else {
1631                 if (__ratelimit(&drbd_ratelimit_state))
1632                         dev_err(DEV, "Can not write resync data to local disk.\n");
1633
1634                 err = drbd_drain_block(mdev, pi->size);
1635
1636                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1637         }
1638
1639         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1640
1641         return err;
1642 }
1643
1644 static int w_restart_write(struct drbd_work *w, int cancel)
1645 {
1646         struct drbd_request *req = container_of(w, struct drbd_request, w);
1647         struct drbd_conf *mdev = w->mdev;
1648         struct bio *bio;
1649         unsigned long start_time;
1650         unsigned long flags;
1651
1652         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1653         if (!expect(req->rq_state & RQ_POSTPONED)) {
1654                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1655                 return -EIO;
1656         }
1657         bio = req->master_bio;
1658         start_time = req->start_time;
1659         /* Postponed requests will not have their master_bio completed!  */
1660         __req_mod(req, DISCARD_WRITE, NULL);
1661         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1662
1663         while (__drbd_make_request(mdev, bio, start_time))
1664                 /* retry */ ;
1665         return 0;
1666 }
1667
1668 static void restart_conflicting_writes(struct drbd_conf *mdev,
1669                                        sector_t sector, int size)
1670 {
1671         struct drbd_interval *i;
1672         struct drbd_request *req;
1673
1674         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1675                 if (!i->local)
1676                         continue;
1677                 req = container_of(i, struct drbd_request, i);
1678                 if (req->rq_state & RQ_LOCAL_PENDING ||
1679                     !(req->rq_state & RQ_POSTPONED))
1680                         continue;
1681                 if (expect(list_empty(&req->w.list))) {
1682                         req->w.mdev = mdev;
1683                         req->w.cb = w_restart_write;
1684                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1685                 }
1686         }
1687 }
1688
1689 /*
1690  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1691  */
1692 static int e_end_block(struct drbd_work *w, int cancel)
1693 {
1694         struct drbd_peer_request *peer_req =
1695                 container_of(w, struct drbd_peer_request, w);
1696         struct drbd_conf *mdev = w->mdev;
1697         sector_t sector = peer_req->i.sector;
1698         int err = 0, pcmd;
1699
1700         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1701                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1702                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1703                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1704                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1705                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1706                         err = drbd_send_ack(mdev, pcmd, peer_req);
1707                         if (pcmd == P_RS_WRITE_ACK)
1708                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1709                 } else {
1710                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1711                         /* we expect it to be marked out of sync anyways...
1712                          * maybe assert this?  */
1713                 }
1714                 dec_unacked(mdev);
1715         }
1716         /* we delete from the conflict detection hash _after_ we sent out the
1717          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1718         if (mdev->tconn->net_conf->two_primaries) {
1719                 spin_lock_irq(&mdev->tconn->req_lock);
1720                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1721                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1722                 if (peer_req->flags & EE_RESTART_REQUESTS)
1723                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1724                 spin_unlock_irq(&mdev->tconn->req_lock);
1725         } else
1726                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1727
1728         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1729
1730         return err;
1731 }
1732
1733 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1734 {
1735         struct drbd_conf *mdev = w->mdev;
1736         struct drbd_peer_request *peer_req =
1737                 container_of(w, struct drbd_peer_request, w);
1738         int err;
1739
1740         err = drbd_send_ack(mdev, ack, peer_req);
1741         dec_unacked(mdev);
1742
1743         return err;
1744 }
1745
1746 static int e_send_discard_write(struct drbd_work *w, int unused)
1747 {
1748         return e_send_ack(w, P_DISCARD_WRITE);
1749 }
1750
1751 static int e_send_retry_write(struct drbd_work *w, int unused)
1752 {
1753         struct drbd_tconn *tconn = w->mdev->tconn;
1754
1755         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1756                              P_RETRY_WRITE : P_DISCARD_WRITE);
1757 }
1758
1759 static bool seq_greater(u32 a, u32 b)
1760 {
1761         /*
1762          * We assume 32-bit wrap-around here.
1763          * For 24-bit wrap-around, we would have to shift:
1764          *  a <<= 8; b <<= 8;
1765          */
1766         return (s32)a - (s32)b > 0;
1767 }
1768
1769 static u32 seq_max(u32 a, u32 b)
1770 {
1771         return seq_greater(a, b) ? a : b;
1772 }
1773
1774 static bool need_peer_seq(struct drbd_conf *mdev)
1775 {
1776         struct drbd_tconn *tconn = mdev->tconn;
1777
1778         /*
1779          * We only need to keep track of the last packet_seq number of our peer
1780          * if we are in dual-primary mode and we have the discard flag set; see
1781          * handle_write_conflicts().
1782          */
1783         return tconn->net_conf->two_primaries &&
1784                test_bit(DISCARD_CONCURRENT, &tconn->flags);
1785 }
1786
1787 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1788 {
1789         unsigned int newest_peer_seq;
1790
1791         if (need_peer_seq(mdev)) {
1792                 spin_lock(&mdev->peer_seq_lock);
1793                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1794                 mdev->peer_seq = newest_peer_seq;
1795                 spin_unlock(&mdev->peer_seq_lock);
1796                 /* wake up only if we actually changed mdev->peer_seq */
1797                 if (peer_seq == newest_peer_seq)
1798                         wake_up(&mdev->seq_wait);
1799         }
1800 }
1801
1802 /* Called from receive_Data.
1803  * Synchronize packets on sock with packets on msock.
1804  *
1805  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1806  * packet traveling on msock, they are still processed in the order they have
1807  * been sent.
1808  *
1809  * Note: we don't care for Ack packets overtaking P_DATA packets.
1810  *
1811  * In case packet_seq is larger than mdev->peer_seq number, there are
1812  * outstanding packets on the msock. We wait for them to arrive.
1813  * In case we are the logically next packet, we update mdev->peer_seq
1814  * ourselves. Correctly handles 32bit wrap around.
1815  *
1816  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1817  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1818  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1819  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1820  *
1821  * returns 0 if we may process the packet,
1822  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1823 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1824 {
1825         DEFINE_WAIT(wait);
1826         long timeout;
1827         int ret;
1828
1829         if (!need_peer_seq(mdev))
1830                 return 0;
1831
1832         spin_lock(&mdev->peer_seq_lock);
1833         for (;;) {
1834                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1835                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1836                         ret = 0;
1837                         break;
1838                 }
1839                 if (signal_pending(current)) {
1840                         ret = -ERESTARTSYS;
1841                         break;
1842                 }
1843                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1844                 spin_unlock(&mdev->peer_seq_lock);
1845                 timeout = mdev->tconn->net_conf->ping_timeo*HZ/10;
1846                 timeout = schedule_timeout(timeout);
1847                 spin_lock(&mdev->peer_seq_lock);
1848                 if (!timeout) {
1849                         ret = -ETIMEDOUT;
1850                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1851                         break;
1852                 }
1853         }
1854         spin_unlock(&mdev->peer_seq_lock);
1855         finish_wait(&mdev->seq_wait, &wait);
1856         return ret;
1857 }
1858
1859 /* see also bio_flags_to_wire()
1860  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1861  * flags and back. We may replicate to other kernel versions. */
1862 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1863 {
1864         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1865                 (dpf & DP_FUA ? REQ_FUA : 0) |
1866                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1867                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1868 }
1869
1870 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1871                                     unsigned int size)
1872 {
1873         struct drbd_interval *i;
1874
1875     repeat:
1876         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1877                 struct drbd_request *req;
1878                 struct bio_and_error m;
1879
1880                 if (!i->local)
1881                         continue;
1882                 req = container_of(i, struct drbd_request, i);
1883                 if (!(req->rq_state & RQ_POSTPONED))
1884                         continue;
1885                 req->rq_state &= ~RQ_POSTPONED;
1886                 __req_mod(req, NEG_ACKED, &m);
1887                 spin_unlock_irq(&mdev->tconn->req_lock);
1888                 if (m.bio)
1889                         complete_master_bio(mdev, &m);
1890                 spin_lock_irq(&mdev->tconn->req_lock);
1891                 goto repeat;
1892         }
1893 }
1894
1895 static int handle_write_conflicts(struct drbd_conf *mdev,
1896                                   struct drbd_peer_request *peer_req)
1897 {
1898         struct drbd_tconn *tconn = mdev->tconn;
1899         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1900         sector_t sector = peer_req->i.sector;
1901         const unsigned int size = peer_req->i.size;
1902         struct drbd_interval *i;
1903         bool equal;
1904         int err;
1905
1906         /*
1907          * Inserting the peer request into the write_requests tree will prevent
1908          * new conflicting local requests from being added.
1909          */
1910         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1911
1912     repeat:
1913         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1914                 if (i == &peer_req->i)
1915                         continue;
1916
1917                 if (!i->local) {
1918                         /*
1919                          * Our peer has sent a conflicting remote request; this
1920                          * should not happen in a two-node setup.  Wait for the
1921                          * earlier peer request to complete.
1922                          */
1923                         err = drbd_wait_misc(mdev, i);
1924                         if (err)
1925                                 goto out;
1926                         goto repeat;
1927                 }
1928
1929                 equal = i->sector == sector && i->size == size;
1930                 if (resolve_conflicts) {
1931                         /*
1932                          * If the peer request is fully contained within the
1933                          * overlapping request, it can be discarded; otherwise,
1934                          * it will be retried once all overlapping requests
1935                          * have completed.
1936                          */
1937                         bool discard = i->sector <= sector && i->sector +
1938                                        (i->size >> 9) >= sector + (size >> 9);
1939
1940                         if (!equal)
1941                                 dev_alert(DEV, "Concurrent writes detected: "
1942                                                "local=%llus +%u, remote=%llus +%u, "
1943                                                "assuming %s came first\n",
1944                                           (unsigned long long)i->sector, i->size,
1945                                           (unsigned long long)sector, size,
1946                                           discard ? "local" : "remote");
1947
1948                         inc_unacked(mdev);
1949                         peer_req->w.cb = discard ? e_send_discard_write :
1950                                                    e_send_retry_write;
1951                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1952                         wake_asender(mdev->tconn);
1953
1954                         err = -ENOENT;
1955                         goto out;
1956                 } else {
1957                         struct drbd_request *req =
1958                                 container_of(i, struct drbd_request, i);
1959
1960                         if (!equal)
1961                                 dev_alert(DEV, "Concurrent writes detected: "
1962                                                "local=%llus +%u, remote=%llus +%u\n",
1963                                           (unsigned long long)i->sector, i->size,
1964                                           (unsigned long long)sector, size);
1965
1966                         if (req->rq_state & RQ_LOCAL_PENDING ||
1967                             !(req->rq_state & RQ_POSTPONED)) {
1968                                 /*
1969                                  * Wait for the node with the discard flag to
1970                                  * decide if this request will be discarded or
1971                                  * retried.  Requests that are discarded will
1972                                  * disappear from the write_requests tree.
1973                                  *
1974                                  * In addition, wait for the conflicting
1975                                  * request to finish locally before submitting
1976                                  * the conflicting peer request.
1977                                  */
1978                                 err = drbd_wait_misc(mdev, &req->i);
1979                                 if (err) {
1980                                         _conn_request_state(mdev->tconn,
1981                                                             NS(conn, C_TIMEOUT),
1982                                                             CS_HARD);
1983                                         fail_postponed_requests(mdev, sector, size);
1984                                         goto out;
1985                                 }
1986                                 goto repeat;
1987                         }
1988                         /*
1989                          * Remember to restart the conflicting requests after
1990                          * the new peer request has completed.
1991                          */
1992                         peer_req->flags |= EE_RESTART_REQUESTS;
1993                 }
1994         }
1995         err = 0;
1996
1997     out:
1998         if (err)
1999                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2000         return err;
2001 }
2002
2003 /* mirrored write */
2004 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2005 {
2006         struct drbd_conf *mdev;
2007         sector_t sector;
2008         struct drbd_peer_request *peer_req;
2009         struct p_data *p = pi->data;
2010         u32 peer_seq = be32_to_cpu(p->seq_num);
2011         int rw = WRITE;
2012         u32 dp_flags;
2013         int err;
2014
2015         mdev = vnr_to_mdev(tconn, pi->vnr);
2016         if (!mdev)
2017                 return -EIO;
2018
2019         if (!get_ldev(mdev)) {
2020                 int err2;
2021
2022                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2023                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2024                 atomic_inc(&mdev->current_epoch->epoch_size);
2025                 err2 = drbd_drain_block(mdev, pi->size);
2026                 if (!err)
2027                         err = err2;
2028                 return err;
2029         }
2030
2031         /*
2032          * Corresponding put_ldev done either below (on various errors), or in
2033          * drbd_peer_request_endio, if we successfully submit the data at the
2034          * end of this function.
2035          */
2036
2037         sector = be64_to_cpu(p->sector);
2038         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2039         if (!peer_req) {
2040                 put_ldev(mdev);
2041                 return -EIO;
2042         }
2043
2044         peer_req->w.cb = e_end_block;
2045
2046         dp_flags = be32_to_cpu(p->dp_flags);
2047         rw |= wire_flags_to_bio(mdev, dp_flags);
2048
2049         if (dp_flags & DP_MAY_SET_IN_SYNC)
2050                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2051
2052         spin_lock(&mdev->epoch_lock);
2053         peer_req->epoch = mdev->current_epoch;
2054         atomic_inc(&peer_req->epoch->epoch_size);
2055         atomic_inc(&peer_req->epoch->active);
2056         spin_unlock(&mdev->epoch_lock);
2057
2058         if (mdev->tconn->net_conf->two_primaries) {
2059                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2060                 if (err)
2061                         goto out_interrupted;
2062                 spin_lock_irq(&mdev->tconn->req_lock);
2063                 err = handle_write_conflicts(mdev, peer_req);
2064                 if (err) {
2065                         spin_unlock_irq(&mdev->tconn->req_lock);
2066                         if (err == -ENOENT) {
2067                                 put_ldev(mdev);
2068                                 return 0;
2069                         }
2070                         goto out_interrupted;
2071                 }
2072         } else
2073                 spin_lock_irq(&mdev->tconn->req_lock);
2074         list_add(&peer_req->w.list, &mdev->active_ee);
2075         spin_unlock_irq(&mdev->tconn->req_lock);
2076
2077         if (mdev->tconn->agreed_pro_version < 100) {
2078                 switch (mdev->tconn->net_conf->wire_protocol) {
2079                 case DRBD_PROT_C:
2080                         dp_flags |= DP_SEND_WRITE_ACK;
2081                         break;
2082                 case DRBD_PROT_B:
2083                         dp_flags |= DP_SEND_RECEIVE_ACK;
2084                         break;
2085                 }
2086         }
2087
2088         if (dp_flags & DP_SEND_WRITE_ACK) {
2089                 peer_req->flags |= EE_SEND_WRITE_ACK;
2090                 inc_unacked(mdev);
2091                 /* corresponding dec_unacked() in e_end_block()
2092                  * respective _drbd_clear_done_ee */
2093         }
2094
2095         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2096                 /* I really don't like it that the receiver thread
2097                  * sends on the msock, but anyways */
2098                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2099         }
2100
2101         if (mdev->state.pdsk < D_INCONSISTENT) {
2102                 /* In case we have the only disk of the cluster, */
2103                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2104                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2105                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2106                 drbd_al_begin_io(mdev, &peer_req->i);
2107         }
2108
2109         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2110         if (!err)
2111                 return 0;
2112
2113         /* don't care for the reason here */
2114         dev_err(DEV, "submit failed, triggering re-connect\n");
2115         spin_lock_irq(&mdev->tconn->req_lock);
2116         list_del(&peer_req->w.list);
2117         drbd_remove_epoch_entry_interval(mdev, peer_req);
2118         spin_unlock_irq(&mdev->tconn->req_lock);
2119         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2120                 drbd_al_complete_io(mdev, &peer_req->i);
2121
2122 out_interrupted:
2123         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2124         put_ldev(mdev);
2125         drbd_free_peer_req(mdev, peer_req);
2126         return err;
2127 }
2128
2129 /* We may throttle resync, if the lower device seems to be busy,
2130  * and current sync rate is above c_min_rate.
2131  *
2132  * To decide whether or not the lower device is busy, we use a scheme similar
2133  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2134  * (more than 64 sectors) of activity we cannot account for with our own resync
2135  * activity, it obviously is "busy".
2136  *
2137  * The current sync rate used here uses only the most recent two step marks,
2138  * to have a short time average so we can react faster.
2139  */
2140 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2141 {
2142         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2143         unsigned long db, dt, dbdt;
2144         struct lc_element *tmp;
2145         int curr_events;
2146         int throttle = 0;
2147
2148         /* feature disabled? */
2149         if (mdev->ldev->dc.c_min_rate == 0)
2150                 return 0;
2151
2152         spin_lock_irq(&mdev->al_lock);
2153         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2154         if (tmp) {
2155                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2156                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2157                         spin_unlock_irq(&mdev->al_lock);
2158                         return 0;
2159                 }
2160                 /* Do not slow down if app IO is already waiting for this extent */
2161         }
2162         spin_unlock_irq(&mdev->al_lock);
2163
2164         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2165                       (int)part_stat_read(&disk->part0, sectors[1]) -
2166                         atomic_read(&mdev->rs_sect_ev);
2167
2168         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2169                 unsigned long rs_left;
2170                 int i;
2171
2172                 mdev->rs_last_events = curr_events;
2173
2174                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2175                  * approx. */
2176                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2177
2178                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2179                         rs_left = mdev->ov_left;
2180                 else
2181                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2182
2183                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2184                 if (!dt)
2185                         dt++;
2186                 db = mdev->rs_mark_left[i] - rs_left;
2187                 dbdt = Bit2KB(db/dt);
2188
2189                 if (dbdt > mdev->ldev->dc.c_min_rate)
2190                         throttle = 1;
2191         }
2192         return throttle;
2193 }
2194
2195
2196 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2197 {
2198         struct drbd_conf *mdev;
2199         sector_t sector;
2200         sector_t capacity;
2201         struct drbd_peer_request *peer_req;
2202         struct digest_info *di = NULL;
2203         int size, verb;
2204         unsigned int fault_type;
2205         struct p_block_req *p = pi->data;
2206
2207         mdev = vnr_to_mdev(tconn, pi->vnr);
2208         if (!mdev)
2209                 return -EIO;
2210         capacity = drbd_get_capacity(mdev->this_bdev);
2211
2212         sector = be64_to_cpu(p->sector);
2213         size   = be32_to_cpu(p->blksize);
2214
2215         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2216                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2217                                 (unsigned long long)sector, size);
2218                 return -EINVAL;
2219         }
2220         if (sector + (size>>9) > capacity) {
2221                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2222                                 (unsigned long long)sector, size);
2223                 return -EINVAL;
2224         }
2225
2226         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2227                 verb = 1;
2228                 switch (pi->cmd) {
2229                 case P_DATA_REQUEST:
2230                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2231                         break;
2232                 case P_RS_DATA_REQUEST:
2233                 case P_CSUM_RS_REQUEST:
2234                 case P_OV_REQUEST:
2235                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2236                         break;
2237                 case P_OV_REPLY:
2238                         verb = 0;
2239                         dec_rs_pending(mdev);
2240                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2241                         break;
2242                 default:
2243                         BUG();
2244                 }
2245                 if (verb && __ratelimit(&drbd_ratelimit_state))
2246                         dev_err(DEV, "Can not satisfy peer's read request, "
2247                             "no local data.\n");
2248
2249                 /* drain possibly payload */
2250                 return drbd_drain_block(mdev, pi->size);
2251         }
2252
2253         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2254          * "criss-cross" setup, that might cause write-out on some other DRBD,
2255          * which in turn might block on the other node at this very place.  */
2256         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2257         if (!peer_req) {
2258                 put_ldev(mdev);
2259                 return -ENOMEM;
2260         }
2261
2262         switch (pi->cmd) {
2263         case P_DATA_REQUEST:
2264                 peer_req->w.cb = w_e_end_data_req;
2265                 fault_type = DRBD_FAULT_DT_RD;
2266                 /* application IO, don't drbd_rs_begin_io */
2267                 goto submit;
2268
2269         case P_RS_DATA_REQUEST:
2270                 peer_req->w.cb = w_e_end_rsdata_req;
2271                 fault_type = DRBD_FAULT_RS_RD;
2272                 /* used in the sector offset progress display */
2273                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2274                 break;
2275
2276         case P_OV_REPLY:
2277         case P_CSUM_RS_REQUEST:
2278                 fault_type = DRBD_FAULT_RS_RD;
2279                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2280                 if (!di)
2281                         goto out_free_e;
2282
2283                 di->digest_size = pi->size;
2284                 di->digest = (((char *)di)+sizeof(struct digest_info));
2285
2286                 peer_req->digest = di;
2287                 peer_req->flags |= EE_HAS_DIGEST;
2288
2289                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2290                         goto out_free_e;
2291
2292                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2293                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2294                         peer_req->w.cb = w_e_end_csum_rs_req;
2295                         /* used in the sector offset progress display */
2296                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2297                 } else if (pi->cmd == P_OV_REPLY) {
2298                         /* track progress, we may need to throttle */
2299                         atomic_add(size >> 9, &mdev->rs_sect_in);
2300                         peer_req->w.cb = w_e_end_ov_reply;
2301                         dec_rs_pending(mdev);
2302                         /* drbd_rs_begin_io done when we sent this request,
2303                          * but accounting still needs to be done. */
2304                         goto submit_for_resync;
2305                 }
2306                 break;
2307
2308         case P_OV_REQUEST:
2309                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2310                     mdev->tconn->agreed_pro_version >= 90) {
2311                         unsigned long now = jiffies;
2312                         int i;
2313                         mdev->ov_start_sector = sector;
2314                         mdev->ov_position = sector;
2315                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2316                         mdev->rs_total = mdev->ov_left;
2317                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2318                                 mdev->rs_mark_left[i] = mdev->ov_left;
2319                                 mdev->rs_mark_time[i] = now;
2320                         }
2321                         dev_info(DEV, "Online Verify start sector: %llu\n",
2322                                         (unsigned long long)sector);
2323                 }
2324                 peer_req->w.cb = w_e_end_ov_req;
2325                 fault_type = DRBD_FAULT_RS_RD;
2326                 break;
2327
2328         default:
2329                 BUG();
2330         }
2331
2332         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2333          * wrt the receiver, but it is not as straightforward as it may seem.
2334          * Various places in the resync start and stop logic assume resync
2335          * requests are processed in order, requeuing this on the worker thread
2336          * introduces a bunch of new code for synchronization between threads.
2337          *
2338          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2339          * "forever", throttling after drbd_rs_begin_io will lock that extent
2340          * for application writes for the same time.  For now, just throttle
2341          * here, where the rest of the code expects the receiver to sleep for
2342          * a while, anyways.
2343          */
2344
2345         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2346          * this defers syncer requests for some time, before letting at least
2347          * on request through.  The resync controller on the receiving side
2348          * will adapt to the incoming rate accordingly.
2349          *
2350          * We cannot throttle here if remote is Primary/SyncTarget:
2351          * we would also throttle its application reads.
2352          * In that case, throttling is done on the SyncTarget only.
2353          */
2354         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2355                 schedule_timeout_uninterruptible(HZ/10);
2356         if (drbd_rs_begin_io(mdev, sector))
2357                 goto out_free_e;
2358
2359 submit_for_resync:
2360         atomic_add(size >> 9, &mdev->rs_sect_ev);
2361
2362 submit:
2363         inc_unacked(mdev);
2364         spin_lock_irq(&mdev->tconn->req_lock);
2365         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2366         spin_unlock_irq(&mdev->tconn->req_lock);
2367
2368         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2369                 return 0;
2370
2371         /* don't care for the reason here */
2372         dev_err(DEV, "submit failed, triggering re-connect\n");
2373         spin_lock_irq(&mdev->tconn->req_lock);
2374         list_del(&peer_req->w.list);
2375         spin_unlock_irq(&mdev->tconn->req_lock);
2376         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2377
2378 out_free_e:
2379         put_ldev(mdev);
2380         drbd_free_peer_req(mdev, peer_req);
2381         return -EIO;
2382 }
2383
2384 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2385 {
2386         int self, peer, rv = -100;
2387         unsigned long ch_self, ch_peer;
2388
2389         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2390         peer = mdev->p_uuid[UI_BITMAP] & 1;
2391
2392         ch_peer = mdev->p_uuid[UI_SIZE];
2393         ch_self = mdev->comm_bm_set;
2394
2395         switch (mdev->tconn->net_conf->after_sb_0p) {
2396         case ASB_CONSENSUS:
2397         case ASB_DISCARD_SECONDARY:
2398         case ASB_CALL_HELPER:
2399                 dev_err(DEV, "Configuration error.\n");
2400                 break;
2401         case ASB_DISCONNECT:
2402                 break;
2403         case ASB_DISCARD_YOUNGER_PRI:
2404                 if (self == 0 && peer == 1) {
2405                         rv = -1;
2406                         break;
2407                 }
2408                 if (self == 1 && peer == 0) {
2409                         rv =  1;
2410                         break;
2411                 }
2412                 /* Else fall through to one of the other strategies... */
2413         case ASB_DISCARD_OLDER_PRI:
2414                 if (self == 0 && peer == 1) {
2415                         rv = 1;
2416                         break;
2417                 }
2418                 if (self == 1 && peer == 0) {
2419                         rv = -1;
2420                         break;
2421                 }
2422                 /* Else fall through to one of the other strategies... */
2423                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2424                      "Using discard-least-changes instead\n");
2425         case ASB_DISCARD_ZERO_CHG:
2426                 if (ch_peer == 0 && ch_self == 0) {
2427                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2428                                 ? -1 : 1;
2429                         break;
2430                 } else {
2431                         if (ch_peer == 0) { rv =  1; break; }
2432                         if (ch_self == 0) { rv = -1; break; }
2433                 }
2434                 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2435                         break;
2436         case ASB_DISCARD_LEAST_CHG:
2437                 if      (ch_self < ch_peer)
2438                         rv = -1;
2439                 else if (ch_self > ch_peer)
2440                         rv =  1;
2441                 else /* ( ch_self == ch_peer ) */
2442                      /* Well, then use something else. */
2443                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2444                                 ? -1 : 1;
2445                 break;
2446         case ASB_DISCARD_LOCAL:
2447                 rv = -1;
2448                 break;
2449         case ASB_DISCARD_REMOTE:
2450                 rv =  1;
2451         }
2452
2453         return rv;
2454 }
2455
2456 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2457 {
2458         int hg, rv = -100;
2459
2460         switch (mdev->tconn->net_conf->after_sb_1p) {
2461         case ASB_DISCARD_YOUNGER_PRI:
2462         case ASB_DISCARD_OLDER_PRI:
2463         case ASB_DISCARD_LEAST_CHG:
2464         case ASB_DISCARD_LOCAL:
2465         case ASB_DISCARD_REMOTE:
2466                 dev_err(DEV, "Configuration error.\n");
2467                 break;
2468         case ASB_DISCONNECT:
2469                 break;
2470         case ASB_CONSENSUS:
2471                 hg = drbd_asb_recover_0p(mdev);
2472                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2473                         rv = hg;
2474                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2475                         rv = hg;
2476                 break;
2477         case ASB_VIOLENTLY:
2478                 rv = drbd_asb_recover_0p(mdev);
2479                 break;
2480         case ASB_DISCARD_SECONDARY:
2481                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2482         case ASB_CALL_HELPER:
2483                 hg = drbd_asb_recover_0p(mdev);
2484                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2485                         enum drbd_state_rv rv2;
2486
2487                         drbd_set_role(mdev, R_SECONDARY, 0);
2488                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2489                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2490                           * we do not need to wait for the after state change work either. */
2491                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2492                         if (rv2 != SS_SUCCESS) {
2493                                 drbd_khelper(mdev, "pri-lost-after-sb");
2494                         } else {
2495                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2496                                 rv = hg;
2497                         }
2498                 } else
2499                         rv = hg;
2500         }
2501
2502         return rv;
2503 }
2504
2505 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2506 {
2507         int hg, rv = -100;
2508
2509         switch (mdev->tconn->net_conf->after_sb_2p) {
2510         case ASB_DISCARD_YOUNGER_PRI:
2511         case ASB_DISCARD_OLDER_PRI:
2512         case ASB_DISCARD_LEAST_CHG:
2513         case ASB_DISCARD_LOCAL:
2514         case ASB_DISCARD_REMOTE:
2515         case ASB_CONSENSUS:
2516         case ASB_DISCARD_SECONDARY:
2517                 dev_err(DEV, "Configuration error.\n");
2518                 break;
2519         case ASB_VIOLENTLY:
2520                 rv = drbd_asb_recover_0p(mdev);
2521                 break;
2522         case ASB_DISCONNECT:
2523                 break;
2524         case ASB_CALL_HELPER:
2525                 hg = drbd_asb_recover_0p(mdev);
2526                 if (hg == -1) {
2527                         enum drbd_state_rv rv2;
2528
2529                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2530                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2531                           * we do not need to wait for the after state change work either. */
2532                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2533                         if (rv2 != SS_SUCCESS) {
2534                                 drbd_khelper(mdev, "pri-lost-after-sb");
2535                         } else {
2536                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2537                                 rv = hg;
2538                         }
2539                 } else
2540                         rv = hg;
2541         }
2542
2543         return rv;
2544 }
2545
2546 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2547                            u64 bits, u64 flags)
2548 {
2549         if (!uuid) {
2550                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2551                 return;
2552         }
2553         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2554              text,
2555              (unsigned long long)uuid[UI_CURRENT],
2556              (unsigned long long)uuid[UI_BITMAP],
2557              (unsigned long long)uuid[UI_HISTORY_START],
2558              (unsigned long long)uuid[UI_HISTORY_END],
2559              (unsigned long long)bits,
2560              (unsigned long long)flags);
2561 }
2562
2563 /*
2564   100   after split brain try auto recover
2565     2   C_SYNC_SOURCE set BitMap
2566     1   C_SYNC_SOURCE use BitMap
2567     0   no Sync
2568    -1   C_SYNC_TARGET use BitMap
2569    -2   C_SYNC_TARGET set BitMap
2570  -100   after split brain, disconnect
2571 -1000   unrelated data
2572 -1091   requires proto 91
2573 -1096   requires proto 96
2574  */
2575 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2576 {
2577         u64 self, peer;
2578         int i, j;
2579
2580         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2581         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2582
2583         *rule_nr = 10;
2584         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2585                 return 0;
2586
2587         *rule_nr = 20;
2588         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2589              peer != UUID_JUST_CREATED)
2590                 return -2;
2591
2592         *rule_nr = 30;
2593         if (self != UUID_JUST_CREATED &&
2594             (peer == UUID_JUST_CREATED || peer == (u64)0))
2595                 return 2;
2596
2597         if (self == peer) {
2598                 int rct, dc; /* roles at crash time */
2599
2600                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2601
2602                         if (mdev->tconn->agreed_pro_version < 91)
2603                                 return -1091;
2604
2605                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2606                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2607                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2608                                 drbd_uuid_set_bm(mdev, 0UL);
2609
2610                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2611                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2612                                 *rule_nr = 34;
2613                         } else {
2614                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2615                                 *rule_nr = 36;
2616                         }
2617
2618                         return 1;
2619                 }
2620
2621                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2622
2623                         if (mdev->tconn->agreed_pro_version < 91)
2624                                 return -1091;
2625
2626                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2627                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2628                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2629
2630                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2631                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2632                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2633
2634                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2635                                 *rule_nr = 35;
2636                         } else {
2637                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2638                                 *rule_nr = 37;
2639                         }
2640
2641                         return -1;
2642                 }
2643
2644                 /* Common power [off|failure] */
2645                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2646                         (mdev->p_uuid[UI_FLAGS] & 2);
2647                 /* lowest bit is set when we were primary,
2648                  * next bit (weight 2) is set when peer was primary */
2649                 *rule_nr = 40;
2650
2651                 switch (rct) {
2652                 case 0: /* !self_pri && !peer_pri */ return 0;
2653                 case 1: /*  self_pri && !peer_pri */ return 1;
2654                 case 2: /* !self_pri &&  peer_pri */ return -1;
2655                 case 3: /*  self_pri &&  peer_pri */
2656                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2657                         return dc ? -1 : 1;
2658                 }
2659         }
2660
2661         *rule_nr = 50;
2662         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2663         if (self == peer)
2664                 return -1;
2665
2666         *rule_nr = 51;
2667         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2668         if (self == peer) {
2669                 if (mdev->tconn->agreed_pro_version < 96 ?
2670                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2671                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2672                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2673                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2674                            resync as sync source modifications of the peer's UUIDs. */
2675
2676                         if (mdev->tconn->agreed_pro_version < 91)
2677                                 return -1091;
2678
2679                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2680                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2681
2682                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2683                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2684
2685                         return -1;
2686                 }
2687         }
2688
2689         *rule_nr = 60;
2690         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2691         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2692                 peer = mdev->p_uuid[i] & ~((u64)1);
2693                 if (self == peer)
2694                         return -2;
2695         }
2696
2697         *rule_nr = 70;
2698         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2699         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2700         if (self == peer)
2701                 return 1;
2702
2703         *rule_nr = 71;
2704         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2705         if (self == peer) {
2706                 if (mdev->tconn->agreed_pro_version < 96 ?
2707                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2708                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2709                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2710                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2711                            resync as sync source modifications of our UUIDs. */
2712
2713                         if (mdev->tconn->agreed_pro_version < 91)
2714                                 return -1091;
2715
2716                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2717                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2718
2719                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2720                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2721                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2722
2723                         return 1;
2724                 }
2725         }
2726
2727
2728         *rule_nr = 80;
2729         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2730         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2731                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2732                 if (self == peer)
2733                         return 2;
2734         }
2735
2736         *rule_nr = 90;
2737         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2738         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2739         if (self == peer && self != ((u64)0))
2740                 return 100;
2741
2742         *rule_nr = 100;
2743         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2744                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2745                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2746                         peer = mdev->p_uuid[j] & ~((u64)1);
2747                         if (self == peer)
2748                                 return -100;
2749                 }
2750         }
2751
2752         return -1000;
2753 }
2754
2755 /* drbd_sync_handshake() returns the new conn state on success, or
2756    CONN_MASK (-1) on failure.
2757  */
2758 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2759                                            enum drbd_disk_state peer_disk) __must_hold(local)
2760 {
2761         int hg, rule_nr;
2762         enum drbd_conns rv = C_MASK;
2763         enum drbd_disk_state mydisk;
2764
2765         mydisk = mdev->state.disk;
2766         if (mydisk == D_NEGOTIATING)
2767                 mydisk = mdev->new_state_tmp.disk;
2768
2769         dev_info(DEV, "drbd_sync_handshake:\n");
2770         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2771         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2772                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2773
2774         hg = drbd_uuid_compare(mdev, &rule_nr);
2775
2776         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2777
2778         if (hg == -1000) {
2779                 dev_alert(DEV, "Unrelated data, aborting!\n");
2780                 return C_MASK;
2781         }
2782         if (hg < -1000) {
2783                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2784                 return C_MASK;
2785         }
2786
2787         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2788             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2789                 int f = (hg == -100) || abs(hg) == 2;
2790                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2791                 if (f)
2792                         hg = hg*2;
2793                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2794                      hg > 0 ? "source" : "target");
2795         }
2796
2797         if (abs(hg) == 100)
2798                 drbd_khelper(mdev, "initial-split-brain");
2799
2800         if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2801                 int pcount = (mdev->state.role == R_PRIMARY)
2802                            + (peer_role == R_PRIMARY);
2803                 int forced = (hg == -100);
2804
2805                 switch (pcount) {
2806                 case 0:
2807                         hg = drbd_asb_recover_0p(mdev);
2808                         break;
2809                 case 1:
2810                         hg = drbd_asb_recover_1p(mdev);
2811                         break;
2812                 case 2:
2813                         hg = drbd_asb_recover_2p(mdev);
2814                         break;
2815                 }
2816                 if (abs(hg) < 100) {
2817                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2818                              "automatically solved. Sync from %s node\n",
2819                              pcount, (hg < 0) ? "peer" : "this");
2820                         if (forced) {
2821                                 dev_warn(DEV, "Doing a full sync, since"
2822                                      " UUIDs where ambiguous.\n");
2823                                 hg = hg*2;
2824                         }
2825                 }
2826         }
2827
2828         if (hg == -100) {
2829                 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2830                         hg = -1;
2831                 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2832                         hg = 1;
2833
2834                 if (abs(hg) < 100)
2835                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2836                              "Sync from %s node\n",
2837                              (hg < 0) ? "peer" : "this");
2838         }
2839
2840         if (hg == -100) {
2841                 /* FIXME this log message is not correct if we end up here
2842                  * after an attempted attach on a diskless node.
2843                  * We just refuse to attach -- well, we drop the "connection"
2844                  * to that disk, in a way... */
2845                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2846                 drbd_khelper(mdev, "split-brain");
2847                 return C_MASK;
2848         }
2849
2850         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2851                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2852                 return C_MASK;
2853         }
2854
2855         if (hg < 0 && /* by intention we do not use mydisk here. */
2856             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2857                 switch (mdev->tconn->net_conf->rr_conflict) {
2858                 case ASB_CALL_HELPER:
2859                         drbd_khelper(mdev, "pri-lost");
2860                         /* fall through */
2861                 case ASB_DISCONNECT:
2862                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2863                         return C_MASK;
2864                 case ASB_VIOLENTLY:
2865                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2866                              "assumption\n");
2867                 }
2868         }
2869
2870         if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2871                 if (hg == 0)
2872                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2873                 else
2874                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2875                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2876                                  abs(hg) >= 2 ? "full" : "bit-map based");
2877                 return C_MASK;
2878         }
2879
2880         if (abs(hg) >= 2) {
2881                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2882                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2883                                         BM_LOCKED_SET_ALLOWED))
2884                         return C_MASK;
2885         }
2886
2887         if (hg > 0) { /* become sync source. */
2888                 rv = C_WF_BITMAP_S;
2889         } else if (hg < 0) { /* become sync target */
2890                 rv = C_WF_BITMAP_T;
2891         } else {
2892                 rv = C_CONNECTED;
2893                 if (drbd_bm_total_weight(mdev)) {
2894                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2895                              drbd_bm_total_weight(mdev));
2896                 }
2897         }
2898
2899         return rv;
2900 }
2901
2902 /* returns 1 if invalid */
2903 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2904 {
2905         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2906         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2907             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2908                 return 0;
2909
2910         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2911         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2912             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2913                 return 1;
2914
2915         /* everything else is valid if they are equal on both sides. */
2916         if (peer == self)
2917                 return 0;
2918
2919         /* everything es is invalid. */
2920         return 1;
2921 }
2922
2923 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2924 {
2925         struct p_protocol *p = pi->data;
2926         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2927         int p_want_lose, p_two_primaries, cf;
2928         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2929
2930         p_proto         = be32_to_cpu(p->protocol);
2931         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2932         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2933         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2934         p_two_primaries = be32_to_cpu(p->two_primaries);
2935         cf              = be32_to_cpu(p->conn_flags);
2936         p_want_lose = cf & CF_WANT_LOSE;
2937
2938         clear_bit(CONN_DRY_RUN, &tconn->flags);
2939
2940         if (cf & CF_DRY_RUN)
2941                 set_bit(CONN_DRY_RUN, &tconn->flags);
2942
2943         if (p_proto != tconn->net_conf->wire_protocol && tconn->agreed_pro_version < 100) {
2944                 conn_err(tconn, "incompatible communication protocols\n");
2945                 goto disconnect;
2946         }
2947
2948         if (cmp_after_sb(p_after_sb_0p, tconn->net_conf->after_sb_0p)) {
2949                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
2950                 goto disconnect;
2951         }
2952
2953         if (cmp_after_sb(p_after_sb_1p, tconn->net_conf->after_sb_1p)) {
2954                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
2955                 goto disconnect;
2956         }
2957
2958         if (cmp_after_sb(p_after_sb_2p, tconn->net_conf->after_sb_2p)) {
2959                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
2960                 goto disconnect;
2961         }
2962
2963         if (p_want_lose && tconn->net_conf->want_lose) {
2964                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
2965                 goto disconnect;
2966         }
2967
2968         if (p_two_primaries != tconn->net_conf->two_primaries) {
2969                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
2970                 goto disconnect;
2971         }
2972
2973         if (tconn->agreed_pro_version >= 87) {
2974                 unsigned char *my_alg = tconn->net_conf->integrity_alg;
2975                 int err;
2976
2977                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
2978                 if (err)
2979                         return err;
2980
2981                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2982                 if (strcmp(p_integrity_alg, my_alg)) {
2983                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
2984                         goto disconnect;
2985                 }
2986                 conn_info(tconn, "data-integrity-alg: %s\n",
2987                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2988         }
2989
2990         return 0;
2991
2992 disconnect:
2993         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
2994         return -EIO;
2995 }
2996
2997 /* helper function
2998  * input: alg name, feature name
2999  * return: NULL (alg name was "")
3000  *         ERR_PTR(error) if something goes wrong
3001  *         or the crypto hash ptr, if it worked out ok. */
3002 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3003                 const char *alg, const char *name)
3004 {
3005         struct crypto_hash *tfm;
3006
3007         if (!alg[0])
3008                 return NULL;
3009
3010         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3011         if (IS_ERR(tfm)) {
3012                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3013                         alg, name, PTR_ERR(tfm));
3014                 return tfm;
3015         }
3016         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3017                 crypto_free_hash(tfm);
3018                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3019                 return ERR_PTR(-EINVAL);
3020         }
3021         return tfm;
3022 }
3023
3024 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3025 {
3026         void *buffer = tconn->data.rbuf;
3027         int size = pi->size;
3028
3029         while (size) {
3030                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3031                 s = drbd_recv(tconn, buffer, s);
3032                 if (s <= 0) {
3033                         if (s < 0)
3034                                 return s;
3035                         break;
3036                 }
3037                 size -= s;
3038         }
3039         if (size)
3040                 return -EIO;
3041         return 0;
3042 }
3043
3044 /*
3045  * config_unknown_volume  -  device configuration command for unknown volume
3046  *
3047  * When a device is added to an existing connection, the node on which the
3048  * device is added first will send configuration commands to its peer but the
3049  * peer will not know about the device yet.  It will warn and ignore these
3050  * commands.  Once the device is added on the second node, the second node will
3051  * send the same device configuration commands, but in the other direction.
3052  *
3053  * (We can also end up here if drbd is misconfigured.)
3054  */
3055 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3056 {
3057         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3058                   pi->vnr, cmdname(pi->cmd));
3059         return ignore_remaining_packet(tconn, pi);
3060 }
3061
3062 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3063 {
3064         struct drbd_conf *mdev;
3065         struct p_rs_param_95 *p;
3066         unsigned int header_size, data_size, exp_max_sz;
3067         struct crypto_hash *verify_tfm = NULL;
3068         struct crypto_hash *csums_tfm = NULL;
3069         const int apv = tconn->agreed_pro_version;
3070         int *rs_plan_s = NULL;
3071         int fifo_size = 0;
3072         int err;
3073
3074         mdev = vnr_to_mdev(tconn, pi->vnr);
3075         if (!mdev)
3076                 return config_unknown_volume(tconn, pi);
3077
3078         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3079                     : apv == 88 ? sizeof(struct p_rs_param)
3080                                         + SHARED_SECRET_MAX
3081                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3082                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3083
3084         if (pi->size > exp_max_sz) {
3085                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3086                     pi->size, exp_max_sz);
3087                 return -EIO;
3088         }
3089
3090         if (apv <= 88) {
3091                 header_size = sizeof(struct p_rs_param);
3092                 data_size = pi->size - header_size;
3093         } else if (apv <= 94) {
3094                 header_size = sizeof(struct p_rs_param_89);
3095                 data_size = pi->size - header_size;
3096                 D_ASSERT(data_size == 0);
3097         } else {
3098                 header_size = sizeof(struct p_rs_param_95);
3099                 data_size = pi->size - header_size;
3100                 D_ASSERT(data_size == 0);
3101         }
3102
3103         /* initialize verify_alg and csums_alg */
3104         p = pi->data;
3105         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3106
3107         err = drbd_recv_all(mdev->tconn, p, header_size);
3108         if (err)
3109                 return err;
3110
3111         if (get_ldev(mdev)) {
3112                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3113                 put_ldev(mdev);
3114         }
3115
3116         if (apv >= 88) {
3117                 if (apv == 88) {
3118                         if (data_size > SHARED_SECRET_MAX) {
3119                                 dev_err(DEV, "verify-alg too long, "
3120                                     "peer wants %u, accepting only %u byte\n",
3121                                                 data_size, SHARED_SECRET_MAX);
3122                                 return -EIO;
3123                         }
3124
3125                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3126                         if (err)
3127                                 return err;
3128
3129                         /* we expect NUL terminated string */
3130                         /* but just in case someone tries to be evil */
3131                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3132                         p->verify_alg[data_size-1] = 0;
3133
3134                 } else /* apv >= 89 */ {
3135                         /* we still expect NUL terminated strings */
3136                         /* but just in case someone tries to be evil */
3137                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3138                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3139                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3140                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3141                 }
3142
3143                 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3144                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3145                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3146                                     mdev->tconn->net_conf->verify_alg, p->verify_alg);
3147                                 goto disconnect;
3148                         }
3149                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3150                                         p->verify_alg, "verify-alg");
3151                         if (IS_ERR(verify_tfm)) {
3152                                 verify_tfm = NULL;
3153                                 goto disconnect;
3154                         }
3155                 }
3156
3157                 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3158                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3159                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3160                                     mdev->tconn->net_conf->csums_alg, p->csums_alg);
3161                                 goto disconnect;
3162                         }
3163                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3164                                         p->csums_alg, "csums-alg");
3165                         if (IS_ERR(csums_tfm)) {
3166                                 csums_tfm = NULL;
3167                                 goto disconnect;
3168                         }
3169                 }
3170
3171                 if (apv > 94 && get_ldev(mdev)) {
3172                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3173                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3174                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3175                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3176                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3177
3178                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3179                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3180                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3181                                 if (!rs_plan_s) {
3182                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3183                                         put_ldev(mdev);
3184                                         goto disconnect;
3185                                 }
3186                         }
3187                         put_ldev(mdev);
3188                 }
3189
3190                 spin_lock(&mdev->peer_seq_lock);
3191                 /* lock against drbd_nl_syncer_conf() */
3192                 if (verify_tfm) {
3193                         strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3194                         mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3195                         crypto_free_hash(mdev->tconn->verify_tfm);
3196                         mdev->tconn->verify_tfm = verify_tfm;
3197                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3198                 }
3199                 if (csums_tfm) {
3200                         strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3201                         mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3202                         crypto_free_hash(mdev->tconn->csums_tfm);
3203                         mdev->tconn->csums_tfm = csums_tfm;
3204                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3205                 }
3206                 if (fifo_size != mdev->rs_plan_s.size) {
3207                         kfree(mdev->rs_plan_s.values);
3208                         mdev->rs_plan_s.values = rs_plan_s;
3209                         mdev->rs_plan_s.size   = fifo_size;
3210                         mdev->rs_planed = 0;
3211                 }
3212                 spin_unlock(&mdev->peer_seq_lock);
3213         }
3214         return 0;
3215
3216 disconnect:
3217         /* just for completeness: actually not needed,
3218          * as this is not reached if csums_tfm was ok. */
3219         crypto_free_hash(csums_tfm);
3220         /* but free the verify_tfm again, if csums_tfm did not work out */
3221         crypto_free_hash(verify_tfm);
3222         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3223         return -EIO;
3224 }
3225
3226 /* warn if the arguments differ by more than 12.5% */
3227 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3228         const char *s, sector_t a, sector_t b)
3229 {
3230         sector_t d;
3231         if (a == 0 || b == 0)
3232                 return;
3233         d = (a > b) ? (a - b) : (b - a);
3234         if (d > (a>>3) || d > (b>>3))
3235                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3236                      (unsigned long long)a, (unsigned long long)b);
3237 }
3238
3239 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3240 {
3241         struct drbd_conf *mdev;
3242         struct p_sizes *p = pi->data;
3243         enum determine_dev_size dd = unchanged;
3244         sector_t p_size, p_usize, my_usize;
3245         int ldsc = 0; /* local disk size changed */
3246         enum dds_flags ddsf;
3247
3248         mdev = vnr_to_mdev(tconn, pi->vnr);
3249         if (!mdev)
3250                 return config_unknown_volume(tconn, pi);
3251
3252         p_size = be64_to_cpu(p->d_size);
3253         p_usize = be64_to_cpu(p->u_size);
3254
3255         /* just store the peer's disk size for now.
3256          * we still need to figure out whether we accept that. */
3257         mdev->p_size = p_size;
3258
3259         if (get_ldev(mdev)) {
3260                 warn_if_differ_considerably(mdev, "lower level device sizes",
3261                            p_size, drbd_get_max_capacity(mdev->ldev));
3262                 warn_if_differ_considerably(mdev, "user requested size",
3263                                             p_usize, mdev->ldev->dc.disk_size);
3264
3265                 /* if this is the first connect, or an otherwise expected
3266                  * param exchange, choose the minimum */
3267                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3268                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3269                                              p_usize);
3270
3271                 my_usize = mdev->ldev->dc.disk_size;
3272
3273                 if (mdev->ldev->dc.disk_size != p_usize) {
3274                         mdev->ldev->dc.disk_size = p_usize;
3275                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3276                              (unsigned long)mdev->ldev->dc.disk_size);
3277                 }
3278
3279                 /* Never shrink a device with usable data during connect.
3280                    But allow online shrinking if we are connected. */
3281                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3282                    drbd_get_capacity(mdev->this_bdev) &&
3283                    mdev->state.disk >= D_OUTDATED &&
3284                    mdev->state.conn < C_CONNECTED) {
3285                         dev_err(DEV, "The peer's disk size is too small!\n");
3286                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3287                         mdev->ldev->dc.disk_size = my_usize;
3288                         put_ldev(mdev);
3289                         return -EIO;
3290                 }
3291                 put_ldev(mdev);
3292         }
3293
3294         ddsf = be16_to_cpu(p->dds_flags);
3295         if (get_ldev(mdev)) {
3296                 dd = drbd_determine_dev_size(mdev, ddsf);
3297                 put_ldev(mdev);
3298                 if (dd == dev_size_error)
3299                         return -EIO;
3300                 drbd_md_sync(mdev);
3301         } else {
3302                 /* I am diskless, need to accept the peer's size. */
3303                 drbd_set_my_capacity(mdev, p_size);
3304         }
3305
3306         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3307         drbd_reconsider_max_bio_size(mdev);
3308
3309         if (get_ldev(mdev)) {
3310                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3311                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3312                         ldsc = 1;
3313                 }
3314
3315                 put_ldev(mdev);
3316         }
3317
3318         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3319                 if (be64_to_cpu(p->c_size) !=
3320                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3321                         /* we have different sizes, probably peer
3322                          * needs to know my new size... */
3323                         drbd_send_sizes(mdev, 0, ddsf);
3324                 }
3325                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3326                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3327                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3328                             mdev->state.disk >= D_INCONSISTENT) {
3329                                 if (ddsf & DDSF_NO_RESYNC)
3330                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3331                                 else
3332                                         resync_after_online_grow(mdev);
3333                         } else
3334                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3335                 }
3336         }
3337
3338         return 0;
3339 }
3340
3341 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3342 {
3343         struct drbd_conf *mdev;
3344         struct p_uuids *p = pi->data;
3345         u64 *p_uuid;
3346         int i, updated_uuids = 0;
3347
3348         mdev = vnr_to_mdev(tconn, pi->vnr);
3349         if (!mdev)
3350                 return config_unknown_volume(tconn, pi);
3351
3352         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3353
3354         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3355                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3356
3357         kfree(mdev->p_uuid);
3358         mdev->p_uuid = p_uuid;
3359
3360         if (mdev->state.conn < C_CONNECTED &&
3361             mdev->state.disk < D_INCONSISTENT &&
3362             mdev->state.role == R_PRIMARY &&
3363             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3364                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3365                     (unsigned long long)mdev->ed_uuid);
3366                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3367                 return -EIO;
3368         }
3369
3370         if (get_ldev(mdev)) {
3371                 int skip_initial_sync =
3372                         mdev->state.conn == C_CONNECTED &&
3373                         mdev->tconn->agreed_pro_version >= 90 &&
3374                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3375                         (p_uuid[UI_FLAGS] & 8);
3376                 if (skip_initial_sync) {
3377                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3378                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3379                                         "clear_n_write from receive_uuids",
3380                                         BM_LOCKED_TEST_ALLOWED);
3381                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3382                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3383                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3384                                         CS_VERBOSE, NULL);
3385                         drbd_md_sync(mdev);
3386                         updated_uuids = 1;
3387                 }
3388                 put_ldev(mdev);
3389         } else if (mdev->state.disk < D_INCONSISTENT &&
3390                    mdev->state.role == R_PRIMARY) {
3391                 /* I am a diskless primary, the peer just created a new current UUID
3392                    for me. */
3393                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3394         }
3395
3396         /* Before we test for the disk state, we should wait until an eventually
3397            ongoing cluster wide state change is finished. That is important if
3398            we are primary and are detaching from our disk. We need to see the
3399            new disk state... */
3400         mutex_lock(mdev->state_mutex);
3401         mutex_unlock(mdev->state_mutex);
3402         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3403                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3404
3405         if (updated_uuids)
3406                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3407
3408         return 0;
3409 }
3410
3411 /**
3412  * convert_state() - Converts the peer's view of the cluster state to our point of view
3413  * @ps:         The state as seen by the peer.
3414  */
3415 static union drbd_state convert_state(union drbd_state ps)
3416 {
3417         union drbd_state ms;
3418
3419         static enum drbd_conns c_tab[] = {
3420                 [C_CONNECTED] = C_CONNECTED,
3421
3422                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3423                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3424                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3425                 [C_VERIFY_S]       = C_VERIFY_T,
3426                 [C_MASK]   = C_MASK,
3427         };
3428
3429         ms.i = ps.i;
3430
3431         ms.conn = c_tab[ps.conn];
3432         ms.peer = ps.role;
3433         ms.role = ps.peer;
3434         ms.pdsk = ps.disk;
3435         ms.disk = ps.pdsk;
3436         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3437
3438         return ms;
3439 }
3440
3441 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3442 {
3443         struct drbd_conf *mdev;
3444         struct p_req_state *p = pi->data;
3445         union drbd_state mask, val;
3446         enum drbd_state_rv rv;
3447
3448         mdev = vnr_to_mdev(tconn, pi->vnr);
3449         if (!mdev)
3450                 return -EIO;
3451
3452         mask.i = be32_to_cpu(p->mask);
3453         val.i = be32_to_cpu(p->val);
3454
3455         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3456             mutex_is_locked(mdev->state_mutex)) {
3457                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3458                 return 0;
3459         }
3460
3461         mask = convert_state(mask);
3462         val = convert_state(val);
3463
3464         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3465         drbd_send_sr_reply(mdev, rv);
3466
3467         drbd_md_sync(mdev);
3468
3469         return 0;
3470 }
3471
3472 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3473 {
3474         struct p_req_state *p = pi->data;
3475         union drbd_state mask, val;
3476         enum drbd_state_rv rv;
3477
3478         mask.i = be32_to_cpu(p->mask);
3479         val.i = be32_to_cpu(p->val);
3480
3481         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3482             mutex_is_locked(&tconn->cstate_mutex)) {
3483                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3484                 return 0;
3485         }
3486
3487         mask = convert_state(mask);
3488         val = convert_state(val);
3489
3490         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3491         conn_send_sr_reply(tconn, rv);
3492
3493         return 0;
3494 }
3495
3496 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3497 {
3498         struct drbd_conf *mdev;
3499         struct p_state *p = pi->data;
3500         union drbd_state os, ns, peer_state;
3501         enum drbd_disk_state real_peer_disk;
3502         enum chg_state_flags cs_flags;
3503         int rv;
3504
3505         mdev = vnr_to_mdev(tconn, pi->vnr);
3506         if (!mdev)
3507                 return config_unknown_volume(tconn, pi);
3508
3509         peer_state.i = be32_to_cpu(p->state);
3510
3511         real_peer_disk = peer_state.disk;
3512         if (peer_state.disk == D_NEGOTIATING) {
3513                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3514                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3515         }
3516
3517         spin_lock_irq(&mdev->tconn->req_lock);
3518  retry:
3519         os = ns = drbd_read_state(mdev);
3520         spin_unlock_irq(&mdev->tconn->req_lock);
3521
3522         /* peer says his disk is uptodate, while we think it is inconsistent,
3523          * and this happens while we think we have a sync going on. */
3524         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3525             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3526                 /* If we are (becoming) SyncSource, but peer is still in sync
3527                  * preparation, ignore its uptodate-ness to avoid flapping, it
3528                  * will change to inconsistent once the peer reaches active
3529                  * syncing states.
3530                  * It may have changed syncer-paused flags, however, so we
3531                  * cannot ignore this completely. */
3532                 if (peer_state.conn > C_CONNECTED &&
3533                     peer_state.conn < C_SYNC_SOURCE)
3534                         real_peer_disk = D_INCONSISTENT;
3535
3536                 /* if peer_state changes to connected at the same time,
3537                  * it explicitly notifies us that it finished resync.
3538                  * Maybe we should finish it up, too? */
3539                 else if (os.conn >= C_SYNC_SOURCE &&
3540                          peer_state.conn == C_CONNECTED) {
3541                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3542                                 drbd_resync_finished(mdev);
3543                         return 0;
3544                 }
3545         }
3546
3547         /* peer says his disk is inconsistent, while we think it is uptodate,
3548          * and this happens while the peer still thinks we have a sync going on,
3549          * but we think we are already done with the sync.
3550          * We ignore this to avoid flapping pdsk.
3551          * This should not happen, if the peer is a recent version of drbd. */
3552         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3553             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3554                 real_peer_disk = D_UP_TO_DATE;
3555
3556         if (ns.conn == C_WF_REPORT_PARAMS)
3557                 ns.conn = C_CONNECTED;
3558
3559         if (peer_state.conn == C_AHEAD)
3560                 ns.conn = C_BEHIND;
3561
3562         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3563             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3564                 int cr; /* consider resync */
3565
3566                 /* if we established a new connection */
3567                 cr  = (os.conn < C_CONNECTED);
3568                 /* if we had an established connection
3569                  * and one of the nodes newly attaches a disk */
3570                 cr |= (os.conn == C_CONNECTED &&
3571                        (peer_state.disk == D_NEGOTIATING ||
3572                         os.disk == D_NEGOTIATING));
3573                 /* if we have both been inconsistent, and the peer has been
3574                  * forced to be UpToDate with --overwrite-data */
3575                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3576                 /* if we had been plain connected, and the admin requested to
3577                  * start a sync by "invalidate" or "invalidate-remote" */
3578                 cr |= (os.conn == C_CONNECTED &&
3579                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3580                                  peer_state.conn <= C_WF_BITMAP_T));
3581
3582                 if (cr)
3583                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3584
3585                 put_ldev(mdev);
3586                 if (ns.conn == C_MASK) {
3587                         ns.conn = C_CONNECTED;
3588                         if (mdev->state.disk == D_NEGOTIATING) {
3589                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3590                         } else if (peer_state.disk == D_NEGOTIATING) {
3591                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3592                                 peer_state.disk = D_DISKLESS;
3593                                 real_peer_disk = D_DISKLESS;
3594                         } else {
3595                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3596                                         return -EIO;
3597                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3598                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3599                                 return -EIO;
3600                         }
3601                 }
3602         }
3603
3604         spin_lock_irq(&mdev->tconn->req_lock);
3605         if (os.i != drbd_read_state(mdev).i)
3606                 goto retry;
3607         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3608         ns.peer = peer_state.role;
3609         ns.pdsk = real_peer_disk;
3610         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3611         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3612                 ns.disk = mdev->new_state_tmp.disk;
3613         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3614         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3615             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3616                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3617                    for temporal network outages! */
3618                 spin_unlock_irq(&mdev->tconn->req_lock);
3619                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3620                 tl_clear(mdev->tconn);
3621                 drbd_uuid_new_current(mdev);
3622                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3623                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3624                 return -EIO;
3625         }
3626         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3627         ns = drbd_read_state(mdev);
3628         spin_unlock_irq(&mdev->tconn->req_lock);
3629
3630         if (rv < SS_SUCCESS) {
3631                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3632                 return -EIO;
3633         }
3634
3635         if (os.conn > C_WF_REPORT_PARAMS) {
3636                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3637                     peer_state.disk != D_NEGOTIATING ) {
3638                         /* we want resync, peer has not yet decided to sync... */
3639                         /* Nowadays only used when forcing a node into primary role and
3640                            setting its disk to UpToDate with that */
3641                         drbd_send_uuids(mdev);
3642                         drbd_send_state(mdev);
3643                 }
3644         }
3645
3646         mdev->tconn->net_conf->want_lose = 0;
3647
3648         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3649
3650         return 0;
3651 }
3652
3653 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3654 {
3655         struct drbd_conf *mdev;
3656         struct p_rs_uuid *p = pi->data;
3657
3658         mdev = vnr_to_mdev(tconn, pi->vnr);
3659         if (!mdev)
3660                 return -EIO;
3661
3662         wait_event(mdev->misc_wait,
3663                    mdev->state.conn == C_WF_SYNC_UUID ||
3664                    mdev->state.conn == C_BEHIND ||
3665                    mdev->state.conn < C_CONNECTED ||
3666                    mdev->state.disk < D_NEGOTIATING);
3667
3668         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3669
3670         /* Here the _drbd_uuid_ functions are right, current should
3671            _not_ be rotated into the history */
3672         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3673                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3674                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3675
3676                 drbd_print_uuids(mdev, "updated sync uuid");
3677                 drbd_start_resync(mdev, C_SYNC_TARGET);
3678
3679                 put_ldev(mdev);
3680         } else
3681                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3682
3683         return 0;
3684 }
3685
3686 /**
3687  * receive_bitmap_plain
3688  *
3689  * Return 0 when done, 1 when another iteration is needed, and a negative error
3690  * code upon failure.
3691  */
3692 static int
3693 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3694                      unsigned long *p, struct bm_xfer_ctx *c)
3695 {
3696         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3697                                  drbd_header_size(mdev->tconn);
3698         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3699                                        c->bm_words - c->word_offset);
3700         unsigned int want = num_words * sizeof(*p);
3701         int err;
3702
3703         if (want != size) {
3704                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3705                 return -EIO;
3706         }
3707         if (want == 0)
3708                 return 0;
3709         err = drbd_recv_all(mdev->tconn, p, want);
3710         if (err)
3711                 return err;
3712
3713         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3714
3715         c->word_offset += num_words;
3716         c->bit_offset = c->word_offset * BITS_PER_LONG;
3717         if (c->bit_offset > c->bm_bits)
3718                 c->bit_offset = c->bm_bits;
3719
3720         return 1;
3721 }
3722
3723 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3724 {
3725         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3726 }
3727
3728 static int dcbp_get_start(struct p_compressed_bm *p)
3729 {
3730         return (p->encoding & 0x80) != 0;
3731 }
3732
3733 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3734 {
3735         return (p->encoding >> 4) & 0x7;
3736 }
3737
3738 /**
3739  * recv_bm_rle_bits
3740  *
3741  * Return 0 when done, 1 when another iteration is needed, and a negative error
3742  * code upon failure.
3743  */
3744 static int
3745 recv_bm_rle_bits(struct drbd_conf *mdev,
3746                 struct p_compressed_bm *p,
3747                  struct bm_xfer_ctx *c,
3748                  unsigned int len)
3749 {
3750         struct bitstream bs;
3751         u64 look_ahead;
3752         u64 rl;
3753         u64 tmp;
3754         unsigned long s = c->bit_offset;
3755         unsigned long e;
3756         int toggle = dcbp_get_start(p);
3757         int have;
3758         int bits;
3759
3760         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3761
3762         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3763         if (bits < 0)
3764                 return -EIO;
3765
3766         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3767                 bits = vli_decode_bits(&rl, look_ahead);
3768                 if (bits <= 0)
3769                         return -EIO;
3770
3771                 if (toggle) {
3772                         e = s + rl -1;
3773                         if (e >= c->bm_bits) {
3774                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3775                                 return -EIO;
3776                         }
3777                         _drbd_bm_set_bits(mdev, s, e);
3778                 }
3779
3780                 if (have < bits) {
3781                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3782                                 have, bits, look_ahead,
3783                                 (unsigned int)(bs.cur.b - p->code),
3784                                 (unsigned int)bs.buf_len);
3785                         return -EIO;
3786                 }
3787                 look_ahead >>= bits;
3788                 have -= bits;
3789
3790                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3791                 if (bits < 0)
3792                         return -EIO;
3793                 look_ahead |= tmp << have;
3794                 have += bits;
3795         }
3796
3797         c->bit_offset = s;
3798         bm_xfer_ctx_bit_to_word_offset(c);
3799
3800         return (s != c->bm_bits);
3801 }
3802
3803 /**
3804  * decode_bitmap_c
3805  *
3806  * Return 0 when done, 1 when another iteration is needed, and a negative error
3807  * code upon failure.
3808  */
3809 static int
3810 decode_bitmap_c(struct drbd_conf *mdev,
3811                 struct p_compressed_bm *p,
3812                 struct bm_xfer_ctx *c,
3813                 unsigned int len)
3814 {
3815         if (dcbp_get_code(p) == RLE_VLI_Bits)
3816                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3817
3818         /* other variants had been implemented for evaluation,
3819          * but have been dropped as this one turned out to be "best"
3820          * during all our tests. */
3821
3822         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3823         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3824         return -EIO;
3825 }
3826
3827 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3828                 const char *direction, struct bm_xfer_ctx *c)
3829 {
3830         /* what would it take to transfer it "plaintext" */
3831         unsigned int header_size = drbd_header_size(mdev->tconn);
3832         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3833         unsigned int plain =
3834                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3835                 c->bm_words * sizeof(unsigned long);
3836         unsigned int total = c->bytes[0] + c->bytes[1];
3837         unsigned int r;
3838
3839         /* total can not be zero. but just in case: */
3840         if (total == 0)
3841                 return;
3842
3843         /* don't report if not compressed */
3844         if (total >= plain)
3845                 return;
3846
3847         /* total < plain. check for overflow, still */
3848         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3849                                     : (1000 * total / plain);
3850
3851         if (r > 1000)
3852                 r = 1000;
3853
3854         r = 1000 - r;
3855         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3856              "total %u; compression: %u.%u%%\n",
3857                         direction,
3858                         c->bytes[1], c->packets[1],
3859                         c->bytes[0], c->packets[0],
3860                         total, r/10, r % 10);
3861 }
3862
3863 /* Since we are processing the bitfield from lower addresses to higher,
3864    it does not matter if the process it in 32 bit chunks or 64 bit
3865    chunks as long as it is little endian. (Understand it as byte stream,
3866    beginning with the lowest byte...) If we would use big endian
3867    we would need to process it from the highest address to the lowest,
3868    in order to be agnostic to the 32 vs 64 bits issue.
3869
3870    returns 0 on failure, 1 if we successfully received it. */
3871 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3872 {
3873         struct drbd_conf *mdev;
3874         struct bm_xfer_ctx c;
3875         int err;
3876
3877         mdev = vnr_to_mdev(tconn, pi->vnr);
3878         if (!mdev)
3879                 return -EIO;
3880
3881         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3882         /* you are supposed to send additional out-of-sync information
3883          * if you actually set bits during this phase */
3884
3885         c = (struct bm_xfer_ctx) {
3886                 .bm_bits = drbd_bm_bits(mdev),
3887                 .bm_words = drbd_bm_words(mdev),
3888         };
3889
3890         for(;;) {
3891                 if (pi->cmd == P_BITMAP)
3892                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3893                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3894                         /* MAYBE: sanity check that we speak proto >= 90,
3895                          * and the feature is enabled! */
3896                         struct p_compressed_bm *p = pi->data;
3897
3898                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3899                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3900                                 err = -EIO;
3901                                 goto out;
3902                         }
3903                         if (pi->size <= sizeof(*p)) {
3904                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3905                                 err = -EIO;
3906                                 goto out;
3907                         }
3908                         err = drbd_recv_all(mdev->tconn, p, pi->size);
3909                         if (err)
3910                                goto out;
3911                         err = decode_bitmap_c(mdev, p, &c, pi->size);
3912                 } else {
3913                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3914                         err = -EIO;
3915                         goto out;
3916                 }
3917
3918                 c.packets[pi->cmd == P_BITMAP]++;
3919                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3920
3921                 if (err <= 0) {
3922                         if (err < 0)
3923                                 goto out;
3924                         break;
3925                 }
3926                 err = drbd_recv_header(mdev->tconn, pi);
3927                 if (err)
3928                         goto out;
3929         }
3930
3931         INFO_bm_xfer_stats(mdev, "receive", &c);
3932
3933         if (mdev->state.conn == C_WF_BITMAP_T) {
3934                 enum drbd_state_rv rv;
3935
3936                 err = drbd_send_bitmap(mdev);
3937                 if (err)
3938                         goto out;
3939                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3940                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3941                 D_ASSERT(rv == SS_SUCCESS);
3942         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3943                 /* admin may have requested C_DISCONNECTING,
3944                  * other threads may have noticed network errors */
3945                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3946                     drbd_conn_str(mdev->state.conn));
3947         }
3948         err = 0;
3949
3950  out:
3951         drbd_bm_unlock(mdev);
3952         if (!err && mdev->state.conn == C_WF_BITMAP_S)
3953                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3954         return err;
3955 }
3956
3957 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
3958 {
3959         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
3960                  pi->cmd, pi->size);
3961
3962         return ignore_remaining_packet(tconn, pi);
3963 }
3964
3965 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
3966 {
3967         /* Make sure we've acked all the TCP data associated
3968          * with the data requests being unplugged */
3969         drbd_tcp_quickack(tconn->data.socket);
3970
3971         return 0;
3972 }
3973
3974 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
3975 {
3976         struct drbd_conf *mdev;
3977         struct p_block_desc *p = pi->data;
3978
3979         mdev = vnr_to_mdev(tconn, pi->vnr);
3980         if (!mdev)
3981                 return -EIO;
3982
3983         switch (mdev->state.conn) {
3984         case C_WF_SYNC_UUID:
3985         case C_WF_BITMAP_T:
3986         case C_BEHIND:
3987                         break;
3988         default:
3989                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3990                                 drbd_conn_str(mdev->state.conn));
3991         }
3992
3993         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3994
3995         return 0;
3996 }
3997
3998 struct data_cmd {
3999         int expect_payload;
4000         size_t pkt_size;
4001         int (*fn)(struct drbd_tconn *, struct packet_info *);
4002 };
4003
4004 static struct data_cmd drbd_cmd_handler[] = {
4005         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4006         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4007         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4008         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4009         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4010         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4011         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4012         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4013         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4014         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4015         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4016         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4017         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4018         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4019         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4020         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4021         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4022         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4023         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4024         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4025         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4026         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4027         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4028 };
4029
4030 static void drbdd(struct drbd_tconn *tconn)
4031 {
4032         struct packet_info pi;
4033         size_t shs; /* sub header size */
4034         int err;
4035
4036         while (get_t_state(&tconn->receiver) == RUNNING) {
4037                 struct data_cmd *cmd;
4038
4039                 drbd_thread_current_set_cpu(&tconn->receiver);
4040                 if (drbd_recv_header(tconn, &pi))
4041                         goto err_out;
4042
4043                 cmd = &drbd_cmd_handler[pi.cmd];
4044                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4045                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4046                         goto err_out;
4047                 }
4048
4049                 shs = cmd->pkt_size;
4050                 if (pi.size > shs && !cmd->expect_payload) {
4051                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4052                         goto err_out;
4053                 }
4054
4055                 if (shs) {
4056                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4057                         if (err)
4058                                 goto err_out;
4059                         pi.size -= shs;
4060                 }
4061
4062                 err = cmd->fn(tconn, &pi);
4063                 if (err) {
4064                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4065                                  cmdname(pi.cmd), err, pi.size);
4066                         goto err_out;
4067                 }
4068         }
4069         return;
4070
4071     err_out:
4072         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4073 }
4074
4075 void conn_flush_workqueue(struct drbd_tconn *tconn)
4076 {
4077         struct drbd_wq_barrier barr;
4078
4079         barr.w.cb = w_prev_work_done;
4080         barr.w.tconn = tconn;
4081         init_completion(&barr.done);
4082         drbd_queue_work(&tconn->data.work, &barr.w);
4083         wait_for_completion(&barr.done);
4084 }
4085
4086 static void drbd_disconnect(struct drbd_tconn *tconn)
4087 {
4088         enum drbd_conns oc;
4089         int rv = SS_UNKNOWN_ERROR;
4090
4091         if (tconn->cstate == C_STANDALONE)
4092                 return;
4093
4094         /* asender does not clean up anything. it must not interfere, either */
4095         drbd_thread_stop(&tconn->asender);
4096         drbd_free_sock(tconn);
4097
4098         down_read(&drbd_cfg_rwsem);
4099         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4100         up_read(&drbd_cfg_rwsem);
4101         conn_info(tconn, "Connection closed\n");
4102
4103         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4104                 conn_try_outdate_peer_async(tconn);
4105
4106         spin_lock_irq(&tconn->req_lock);
4107         oc = tconn->cstate;
4108         if (oc >= C_UNCONNECTED)
4109                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4110
4111         spin_unlock_irq(&tconn->req_lock);
4112
4113         if (oc == C_DISCONNECTING) {
4114                 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4115
4116                 crypto_free_hash(tconn->cram_hmac_tfm);
4117                 tconn->cram_hmac_tfm = NULL;
4118
4119                 kfree(tconn->net_conf);
4120                 tconn->net_conf = NULL;
4121                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4122         }
4123 }
4124
4125 static int drbd_disconnected(int vnr, void *p, void *data)
4126 {
4127         struct drbd_conf *mdev = (struct drbd_conf *)p;
4128         enum drbd_fencing_p fp;
4129         unsigned int i;
4130
4131         /* wait for current activity to cease. */
4132         spin_lock_irq(&mdev->tconn->req_lock);
4133         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4134         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4135         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4136         spin_unlock_irq(&mdev->tconn->req_lock);
4137
4138         /* We do not have data structures that would allow us to
4139          * get the rs_pending_cnt down to 0 again.
4140          *  * On C_SYNC_TARGET we do not have any data structures describing
4141          *    the pending RSDataRequest's we have sent.
4142          *  * On C_SYNC_SOURCE there is no data structure that tracks
4143          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4144          *  And no, it is not the sum of the reference counts in the
4145          *  resync_LRU. The resync_LRU tracks the whole operation including
4146          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4147          *  on the fly. */
4148         drbd_rs_cancel_all(mdev);
4149         mdev->rs_total = 0;
4150         mdev->rs_failed = 0;
4151         atomic_set(&mdev->rs_pending_cnt, 0);
4152         wake_up(&mdev->misc_wait);
4153
4154         del_timer(&mdev->request_timer);
4155
4156         del_timer_sync(&mdev->resync_timer);
4157         resync_timer_fn((unsigned long)mdev);
4158
4159         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4160          * w_make_resync_request etc. which may still be on the worker queue
4161          * to be "canceled" */
4162         drbd_flush_workqueue(mdev);
4163
4164         drbd_finish_peer_reqs(mdev);
4165
4166         kfree(mdev->p_uuid);
4167         mdev->p_uuid = NULL;
4168
4169         if (!drbd_suspended(mdev))
4170                 tl_clear(mdev->tconn);
4171
4172         drbd_md_sync(mdev);
4173
4174         fp = FP_DONT_CARE;
4175         if (get_ldev(mdev)) {
4176                 fp = mdev->ldev->dc.fencing;
4177                 put_ldev(mdev);
4178         }
4179
4180         /* serialize with bitmap writeout triggered by the state change,
4181          * if any. */
4182         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4183
4184         /* tcp_close and release of sendpage pages can be deferred.  I don't
4185          * want to use SO_LINGER, because apparently it can be deferred for
4186          * more than 20 seconds (longest time I checked).
4187          *
4188          * Actually we don't care for exactly when the network stack does its
4189          * put_page(), but release our reference on these pages right here.
4190          */
4191         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4192         if (i)
4193                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4194         i = atomic_read(&mdev->pp_in_use_by_net);
4195         if (i)
4196                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4197         i = atomic_read(&mdev->pp_in_use);
4198         if (i)
4199                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4200
4201         D_ASSERT(list_empty(&mdev->read_ee));
4202         D_ASSERT(list_empty(&mdev->active_ee));
4203         D_ASSERT(list_empty(&mdev->sync_ee));
4204         D_ASSERT(list_empty(&mdev->done_ee));
4205
4206         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4207         atomic_set(&mdev->current_epoch->epoch_size, 0);
4208         D_ASSERT(list_empty(&mdev->current_epoch->list));
4209
4210         return 0;
4211 }
4212
4213 /*
4214  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4215  * we can agree on is stored in agreed_pro_version.
4216  *
4217  * feature flags and the reserved array should be enough room for future
4218  * enhancements of the handshake protocol, and possible plugins...
4219  *
4220  * for now, they are expected to be zero, but ignored.
4221  */
4222 static int drbd_send_features(struct drbd_tconn *tconn)
4223 {
4224         struct drbd_socket *sock;
4225         struct p_connection_features *p;
4226
4227         sock = &tconn->data;
4228         p = conn_prepare_command(tconn, sock);
4229         if (!p)
4230                 return -EIO;
4231         memset(p, 0, sizeof(*p));
4232         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4233         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4234         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4235 }
4236
4237 /*
4238  * return values:
4239  *   1 yes, we have a valid connection
4240  *   0 oops, did not work out, please try again
4241  *  -1 peer talks different language,
4242  *     no point in trying again, please go standalone.
4243  */
4244 static int drbd_do_features(struct drbd_tconn *tconn)
4245 {
4246         /* ASSERT current == tconn->receiver ... */
4247         struct p_connection_features *p;
4248         const int expect = sizeof(struct p_connection_features);
4249         struct packet_info pi;
4250         int err;
4251
4252         err = drbd_send_features(tconn);
4253         if (err)
4254                 return 0;
4255
4256         err = drbd_recv_header(tconn, &pi);
4257         if (err)
4258                 return 0;
4259
4260         if (pi.cmd != P_CONNECTION_FEATURES) {
4261                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4262                      cmdname(pi.cmd), pi.cmd);
4263                 return -1;
4264         }
4265
4266         if (pi.size != expect) {
4267                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4268                      expect, pi.size);
4269                 return -1;
4270         }
4271
4272         p = pi.data;
4273         err = drbd_recv_all_warn(tconn, p, expect);
4274         if (err)
4275                 return 0;
4276
4277         p->protocol_min = be32_to_cpu(p->protocol_min);
4278         p->protocol_max = be32_to_cpu(p->protocol_max);
4279         if (p->protocol_max == 0)
4280                 p->protocol_max = p->protocol_min;
4281
4282         if (PRO_VERSION_MAX < p->protocol_min ||
4283             PRO_VERSION_MIN > p->protocol_max)
4284                 goto incompat;
4285
4286         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4287
4288         conn_info(tconn, "Handshake successful: "
4289              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4290
4291         return 1;
4292
4293  incompat:
4294         conn_err(tconn, "incompatible DRBD dialects: "
4295             "I support %d-%d, peer supports %d-%d\n",
4296             PRO_VERSION_MIN, PRO_VERSION_MAX,
4297             p->protocol_min, p->protocol_max);
4298         return -1;
4299 }
4300
4301 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4302 static int drbd_do_auth(struct drbd_tconn *tconn)
4303 {
4304         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4305         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4306         return -1;
4307 }
4308 #else
4309 #define CHALLENGE_LEN 64
4310
4311 /* Return value:
4312         1 - auth succeeded,
4313         0 - failed, try again (network error),
4314         -1 - auth failed, don't try again.
4315 */
4316
4317 static int drbd_do_auth(struct drbd_tconn *tconn)
4318 {
4319         struct drbd_socket *sock;
4320         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4321         struct scatterlist sg;
4322         char *response = NULL;
4323         char *right_response = NULL;
4324         char *peers_ch = NULL;
4325         unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4326         unsigned int resp_size;
4327         struct hash_desc desc;
4328         struct packet_info pi;
4329         int err, rv;
4330
4331         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4332
4333         desc.tfm = tconn->cram_hmac_tfm;
4334         desc.flags = 0;
4335
4336         rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4337                                 (u8 *)tconn->net_conf->shared_secret, key_len);
4338         if (rv) {
4339                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4340                 rv = -1;
4341                 goto fail;
4342         }
4343
4344         get_random_bytes(my_challenge, CHALLENGE_LEN);
4345
4346         sock = &tconn->data;
4347         if (!conn_prepare_command(tconn, sock)) {
4348                 rv = 0;
4349                 goto fail;
4350         }
4351         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4352                                 my_challenge, CHALLENGE_LEN);
4353         if (!rv)
4354                 goto fail;
4355
4356         err = drbd_recv_header(tconn, &pi);
4357         if (err) {
4358                 rv = 0;
4359                 goto fail;
4360         }
4361
4362         if (pi.cmd != P_AUTH_CHALLENGE) {
4363                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4364                     cmdname(pi.cmd), pi.cmd);
4365                 rv = 0;
4366                 goto fail;
4367         }
4368
4369         if (pi.size > CHALLENGE_LEN * 2) {
4370                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4371                 rv = -1;
4372                 goto fail;
4373         }
4374
4375         peers_ch = kmalloc(pi.size, GFP_NOIO);
4376         if (peers_ch == NULL) {
4377                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4378                 rv = -1;
4379                 goto fail;
4380         }
4381
4382         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4383         if (err) {
4384                 rv = 0;
4385                 goto fail;
4386         }
4387
4388         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4389         response = kmalloc(resp_size, GFP_NOIO);
4390         if (response == NULL) {
4391                 conn_err(tconn, "kmalloc of response failed\n");
4392                 rv = -1;
4393                 goto fail;
4394         }
4395
4396         sg_init_table(&sg, 1);
4397         sg_set_buf(&sg, peers_ch, pi.size);
4398
4399         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4400         if (rv) {
4401                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4402                 rv = -1;
4403                 goto fail;
4404         }
4405
4406         if (!conn_prepare_command(tconn, sock)) {
4407                 rv = 0;
4408                 goto fail;
4409         }
4410         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4411                                 response, resp_size);
4412         if (!rv)
4413                 goto fail;
4414
4415         err = drbd_recv_header(tconn, &pi);
4416         if (err) {
4417                 rv = 0;
4418                 goto fail;
4419         }
4420
4421         if (pi.cmd != P_AUTH_RESPONSE) {
4422                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4423                         cmdname(pi.cmd), pi.cmd);
4424                 rv = 0;
4425                 goto fail;
4426         }
4427
4428         if (pi.size != resp_size) {
4429                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4430                 rv = 0;
4431                 goto fail;
4432         }
4433
4434         err = drbd_recv_all_warn(tconn, response , resp_size);
4435         if (err) {
4436                 rv = 0;
4437                 goto fail;
4438         }
4439
4440         right_response = kmalloc(resp_size, GFP_NOIO);
4441         if (right_response == NULL) {
4442                 conn_err(tconn, "kmalloc of right_response failed\n");
4443                 rv = -1;
4444                 goto fail;
4445         }
4446
4447         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4448
4449         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4450         if (rv) {
4451                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4452                 rv = -1;
4453                 goto fail;
4454         }
4455
4456         rv = !memcmp(response, right_response, resp_size);
4457
4458         if (rv)
4459                 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4460                      resp_size, tconn->net_conf->cram_hmac_alg);
4461         else
4462                 rv = -1;
4463
4464  fail:
4465         kfree(peers_ch);
4466         kfree(response);
4467         kfree(right_response);
4468
4469         return rv;
4470 }
4471 #endif
4472
4473 int drbdd_init(struct drbd_thread *thi)
4474 {
4475         struct drbd_tconn *tconn = thi->tconn;
4476         int h;
4477
4478         conn_info(tconn, "receiver (re)started\n");
4479
4480         do {
4481                 h = drbd_connect(tconn);
4482                 if (h == 0) {
4483                         drbd_disconnect(tconn);
4484                         schedule_timeout_interruptible(HZ);
4485                 }
4486                 if (h == -1) {
4487                         conn_warn(tconn, "Discarding network configuration.\n");
4488                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4489                 }
4490         } while (h == 0);
4491
4492         if (h > 0) {
4493                 if (get_net_conf(tconn)) {
4494                         drbdd(tconn);
4495                         put_net_conf(tconn);
4496                 }
4497         }
4498
4499         drbd_disconnect(tconn);
4500
4501         conn_info(tconn, "receiver terminated\n");
4502         return 0;
4503 }
4504
4505 /* ********* acknowledge sender ******** */
4506
4507 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4508 {
4509         struct p_req_state_reply *p = pi->data;
4510         int retcode = be32_to_cpu(p->retcode);
4511
4512         if (retcode >= SS_SUCCESS) {
4513                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4514         } else {
4515                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4516                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4517                          drbd_set_st_err_str(retcode), retcode);
4518         }
4519         wake_up(&tconn->ping_wait);
4520
4521         return 0;
4522 }
4523
4524 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4525 {
4526         struct drbd_conf *mdev;
4527         struct p_req_state_reply *p = pi->data;
4528         int retcode = be32_to_cpu(p->retcode);
4529
4530         mdev = vnr_to_mdev(tconn, pi->vnr);
4531         if (!mdev)
4532                 return -EIO;
4533
4534         if (retcode >= SS_SUCCESS) {
4535                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4536         } else {
4537                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4538                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4539                         drbd_set_st_err_str(retcode), retcode);
4540         }
4541         wake_up(&mdev->state_wait);
4542
4543         return 0;
4544 }
4545
4546 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4547 {
4548         return drbd_send_ping_ack(tconn);
4549
4550 }
4551
4552 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4553 {
4554         /* restore idle timeout */
4555         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4556         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4557                 wake_up(&tconn->ping_wait);
4558
4559         return 0;
4560 }
4561
4562 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4563 {
4564         struct drbd_conf *mdev;
4565         struct p_block_ack *p = pi->data;
4566         sector_t sector = be64_to_cpu(p->sector);
4567         int blksize = be32_to_cpu(p->blksize);
4568
4569         mdev = vnr_to_mdev(tconn, pi->vnr);
4570         if (!mdev)
4571                 return -EIO;
4572
4573         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4574
4575         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4576
4577         if (get_ldev(mdev)) {
4578                 drbd_rs_complete_io(mdev, sector);
4579                 drbd_set_in_sync(mdev, sector, blksize);
4580                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4581                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4582                 put_ldev(mdev);
4583         }
4584         dec_rs_pending(mdev);
4585         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4586
4587         return 0;
4588 }
4589
4590 static int
4591 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4592                               struct rb_root *root, const char *func,
4593                               enum drbd_req_event what, bool missing_ok)
4594 {
4595         struct drbd_request *req;
4596         struct bio_and_error m;
4597
4598         spin_lock_irq(&mdev->tconn->req_lock);
4599         req = find_request(mdev, root, id, sector, missing_ok, func);
4600         if (unlikely(!req)) {
4601                 spin_unlock_irq(&mdev->tconn->req_lock);
4602                 return -EIO;
4603         }
4604         __req_mod(req, what, &m);
4605         spin_unlock_irq(&mdev->tconn->req_lock);
4606
4607         if (m.bio)
4608                 complete_master_bio(mdev, &m);
4609         return 0;
4610 }
4611
4612 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4613 {
4614         struct drbd_conf *mdev;
4615         struct p_block_ack *p = pi->data;
4616         sector_t sector = be64_to_cpu(p->sector);
4617         int blksize = be32_to_cpu(p->blksize);
4618         enum drbd_req_event what;
4619
4620         mdev = vnr_to_mdev(tconn, pi->vnr);
4621         if (!mdev)
4622                 return -EIO;
4623
4624         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4625
4626         if (p->block_id == ID_SYNCER) {
4627                 drbd_set_in_sync(mdev, sector, blksize);
4628                 dec_rs_pending(mdev);
4629                 return 0;
4630         }
4631         switch (pi->cmd) {
4632         case P_RS_WRITE_ACK:
4633                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4634                 break;
4635         case P_WRITE_ACK:
4636                 what = WRITE_ACKED_BY_PEER;
4637                 break;
4638         case P_RECV_ACK:
4639                 what = RECV_ACKED_BY_PEER;
4640                 break;
4641         case P_DISCARD_WRITE:
4642                 what = DISCARD_WRITE;
4643                 break;
4644         case P_RETRY_WRITE:
4645                 what = POSTPONE_WRITE;
4646                 break;
4647         default:
4648                 BUG();
4649         }
4650
4651         return validate_req_change_req_state(mdev, p->block_id, sector,
4652                                              &mdev->write_requests, __func__,
4653                                              what, false);
4654 }
4655
4656 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4657 {
4658         struct drbd_conf *mdev;
4659         struct p_block_ack *p = pi->data;
4660         sector_t sector = be64_to_cpu(p->sector);
4661         int size = be32_to_cpu(p->blksize);
4662         int err;
4663
4664         mdev = vnr_to_mdev(tconn, pi->vnr);
4665         if (!mdev)
4666                 return -EIO;
4667
4668         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4669
4670         if (p->block_id == ID_SYNCER) {
4671                 dec_rs_pending(mdev);
4672                 drbd_rs_failed_io(mdev, sector, size);
4673                 return 0;
4674         }
4675
4676         err = validate_req_change_req_state(mdev, p->block_id, sector,
4677                                             &mdev->write_requests, __func__,
4678                                             NEG_ACKED, true);
4679         if (err) {
4680                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4681                    The master bio might already be completed, therefore the
4682                    request is no longer in the collision hash. */
4683                 /* In Protocol B we might already have got a P_RECV_ACK
4684                    but then get a P_NEG_ACK afterwards. */
4685                 drbd_set_out_of_sync(mdev, sector, size);
4686         }
4687         return 0;
4688 }
4689
4690 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4691 {
4692         struct drbd_conf *mdev;
4693         struct p_block_ack *p = pi->data;
4694         sector_t sector = be64_to_cpu(p->sector);
4695
4696         mdev = vnr_to_mdev(tconn, pi->vnr);
4697         if (!mdev)
4698                 return -EIO;
4699
4700         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4701
4702         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4703             (unsigned long long)sector, be32_to_cpu(p->blksize));
4704
4705         return validate_req_change_req_state(mdev, p->block_id, sector,
4706                                              &mdev->read_requests, __func__,
4707                                              NEG_ACKED, false);
4708 }
4709
4710 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4711 {
4712         struct drbd_conf *mdev;
4713         sector_t sector;
4714         int size;
4715         struct p_block_ack *p = pi->data;
4716
4717         mdev = vnr_to_mdev(tconn, pi->vnr);
4718         if (!mdev)
4719                 return -EIO;
4720
4721         sector = be64_to_cpu(p->sector);
4722         size = be32_to_cpu(p->blksize);
4723
4724         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4725
4726         dec_rs_pending(mdev);
4727
4728         if (get_ldev_if_state(mdev, D_FAILED)) {
4729                 drbd_rs_complete_io(mdev, sector);
4730                 switch (pi->cmd) {
4731                 case P_NEG_RS_DREPLY:
4732                         drbd_rs_failed_io(mdev, sector, size);
4733                 case P_RS_CANCEL:
4734                         break;
4735                 default:
4736                         BUG();
4737                 }
4738                 put_ldev(mdev);
4739         }
4740
4741         return 0;
4742 }
4743
4744 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4745 {
4746         struct drbd_conf *mdev;
4747         struct p_barrier_ack *p = pi->data;
4748
4749         mdev = vnr_to_mdev(tconn, pi->vnr);
4750         if (!mdev)
4751                 return -EIO;
4752
4753         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4754
4755         if (mdev->state.conn == C_AHEAD &&
4756             atomic_read(&mdev->ap_in_flight) == 0 &&
4757             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4758                 mdev->start_resync_timer.expires = jiffies + HZ;
4759                 add_timer(&mdev->start_resync_timer);
4760         }
4761
4762         return 0;
4763 }
4764
4765 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4766 {
4767         struct drbd_conf *mdev;
4768         struct p_block_ack *p = pi->data;
4769         struct drbd_work *w;
4770         sector_t sector;
4771         int size;
4772
4773         mdev = vnr_to_mdev(tconn, pi->vnr);
4774         if (!mdev)
4775                 return -EIO;
4776
4777         sector = be64_to_cpu(p->sector);
4778         size = be32_to_cpu(p->blksize);
4779
4780         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4781
4782         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4783                 drbd_ov_out_of_sync_found(mdev, sector, size);
4784         else
4785                 ov_out_of_sync_print(mdev);
4786
4787         if (!get_ldev(mdev))
4788                 return 0;
4789
4790         drbd_rs_complete_io(mdev, sector);
4791         dec_rs_pending(mdev);
4792
4793         --mdev->ov_left;
4794
4795         /* let's advance progress step marks only for every other megabyte */
4796         if ((mdev->ov_left & 0x200) == 0x200)
4797                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4798
4799         if (mdev->ov_left == 0) {
4800                 w = kmalloc(sizeof(*w), GFP_NOIO);
4801                 if (w) {
4802                         w->cb = w_ov_finished;
4803                         w->mdev = mdev;
4804                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4805                 } else {
4806                         dev_err(DEV, "kmalloc(w) failed.");
4807                         ov_out_of_sync_print(mdev);
4808                         drbd_resync_finished(mdev);
4809                 }
4810         }
4811         put_ldev(mdev);
4812         return 0;
4813 }
4814
4815 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4816 {
4817         return 0;
4818 }
4819
4820 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4821 {
4822         struct drbd_conf *mdev;
4823         int i, not_empty = 0;
4824
4825         do {
4826                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4827                 flush_signals(current);
4828                 down_read(&drbd_cfg_rwsem);
4829                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4830                         if (drbd_finish_peer_reqs(mdev)) {
4831                                 up_read(&drbd_cfg_rwsem);
4832                                 return 1; /* error */
4833                         }
4834                 }
4835                 up_read(&drbd_cfg_rwsem);
4836                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4837
4838                 spin_lock_irq(&tconn->req_lock);
4839                 rcu_read_lock();
4840                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4841                         not_empty = !list_empty(&mdev->done_ee);
4842                         if (not_empty)
4843                                 break;
4844                 }
4845                 rcu_read_unlock();
4846                 spin_unlock_irq(&tconn->req_lock);
4847         } while (not_empty);
4848
4849         return 0;
4850 }
4851
4852 struct asender_cmd {
4853         size_t pkt_size;
4854         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4855 };
4856
4857 static struct asender_cmd asender_tbl[] = {
4858         [P_PING]            = { 0, got_Ping },
4859         [P_PING_ACK]        = { 0, got_PingAck },
4860         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4861         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4862         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4863         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4864         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4865         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4866         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4867         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4868         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4869         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4870         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4871         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4872         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4873         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4874         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4875 };
4876
4877 int drbd_asender(struct drbd_thread *thi)
4878 {
4879         struct drbd_tconn *tconn = thi->tconn;
4880         struct asender_cmd *cmd = NULL;
4881         struct packet_info pi;
4882         int rv;
4883         void *buf    = tconn->meta.rbuf;
4884         int received = 0;
4885         unsigned int header_size = drbd_header_size(tconn);
4886         int expect   = header_size;
4887         int ping_timeout_active = 0;
4888
4889         current->policy = SCHED_RR;  /* Make this a realtime task! */
4890         current->rt_priority = 2;    /* more important than all other tasks */
4891
4892         while (get_t_state(thi) == RUNNING) {
4893                 drbd_thread_current_set_cpu(thi);
4894                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4895                         if (drbd_send_ping(tconn)) {
4896                                 conn_err(tconn, "drbd_send_ping has failed\n");
4897                                 goto reconnect;
4898                         }
4899                         tconn->meta.socket->sk->sk_rcvtimeo =
4900                                 tconn->net_conf->ping_timeo*HZ/10;
4901                         ping_timeout_active = 1;
4902                 }
4903
4904                 /* TODO: conditionally cork; it may hurt latency if we cork without
4905                    much to send */
4906                 if (!tconn->net_conf->no_cork)
4907                         drbd_tcp_cork(tconn->meta.socket);
4908                 if (tconn_finish_peer_reqs(tconn)) {
4909                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
4910                         goto reconnect;
4911                 }
4912                 /* but unconditionally uncork unless disabled */
4913                 if (!tconn->net_conf->no_cork)
4914                         drbd_tcp_uncork(tconn->meta.socket);
4915
4916                 /* short circuit, recv_msg would return EINTR anyways. */
4917                 if (signal_pending(current))
4918                         continue;
4919
4920                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4921                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4922
4923                 flush_signals(current);
4924
4925                 /* Note:
4926                  * -EINTR        (on meta) we got a signal
4927                  * -EAGAIN       (on meta) rcvtimeo expired
4928                  * -ECONNRESET   other side closed the connection
4929                  * -ERESTARTSYS  (on data) we got a signal
4930                  * rv <  0       other than above: unexpected error!
4931                  * rv == expected: full header or command
4932                  * rv <  expected: "woken" by signal during receive
4933                  * rv == 0       : "connection shut down by peer"
4934                  */
4935                 if (likely(rv > 0)) {
4936                         received += rv;
4937                         buf      += rv;
4938                 } else if (rv == 0) {
4939                         conn_err(tconn, "meta connection shut down by peer.\n");
4940                         goto reconnect;
4941                 } else if (rv == -EAGAIN) {
4942                         /* If the data socket received something meanwhile,
4943                          * that is good enough: peer is still alive. */
4944                         if (time_after(tconn->last_received,
4945                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4946                                 continue;
4947                         if (ping_timeout_active) {
4948                                 conn_err(tconn, "PingAck did not arrive in time.\n");
4949                                 goto reconnect;
4950                         }
4951                         set_bit(SEND_PING, &tconn->flags);
4952                         continue;
4953                 } else if (rv == -EINTR) {
4954                         continue;
4955                 } else {
4956                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4957                         goto reconnect;
4958                 }
4959
4960                 if (received == expect && cmd == NULL) {
4961                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
4962                                 goto reconnect;
4963                         cmd = &asender_tbl[pi.cmd];
4964                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
4965                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4966                                         pi.cmd, pi.size);
4967                                 goto disconnect;
4968                         }
4969                         expect = header_size + cmd->pkt_size;
4970                         if (pi.size != expect - header_size) {
4971                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4972                                         pi.cmd, pi.size);
4973                                 goto reconnect;
4974                         }
4975                 }
4976                 if (received == expect) {
4977                         bool err;
4978
4979                         err = cmd->fn(tconn, &pi);
4980                         if (err) {
4981                                 conn_err(tconn, "%pf failed\n", cmd->fn);
4982                                 goto reconnect;
4983                         }
4984
4985                         tconn->last_received = jiffies;
4986
4987                         /* the idle_timeout (ping-int)
4988                          * has been restored in got_PingAck() */
4989                         if (cmd == &asender_tbl[P_PING_ACK])
4990                                 ping_timeout_active = 0;
4991
4992                         buf      = tconn->meta.rbuf;
4993                         received = 0;
4994                         expect   = header_size;
4995                         cmd      = NULL;
4996                 }
4997         }
4998
4999         if (0) {
5000 reconnect:
5001                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5002         }
5003         if (0) {
5004 disconnect:
5005                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5006         }
5007         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5008
5009         conn_info(tconn, "asender terminated\n");
5010
5011         return 0;
5012 }